import os, re
from twisted.internet import defer
-from twisted.web2 import server, http, http_headers
-from twisted.python import log
+from twisted.web2 import server, http, http_headers, static
+from twisted.python import log, failure
from twisted.python.filepath import FilePath
from apt_dht_conf import config
log.err(failure)
raise RuntimeError, "IP address for this machine could not be found"
- def check_freshness(self, path, modtime, resp):
+ def check_freshness(self, req, path, modtime, resp):
log.msg('Checking if %s is still fresh' % path)
d = self.peers.get([path], "HEAD", modtime)
- d.addCallback(self.check_freshness_done, path, resp)
+ d.addCallback(self.check_freshness_done, req, path, resp)
return d
- def check_freshness_done(self, resp, path, orig_resp):
+ def check_freshness_done(self, resp, req, path, orig_resp):
if resp.code == 304:
log.msg('Still fresh, returning: %s' % path)
return orig_resp
else:
log.msg('Stale, need to redownload: %s' % path)
- return self.get_resp(path)
+ return self.get_resp(req, path)
- def get_resp(self, path):
+ def get_resp(self, req, path):
d = defer.Deferred()
log.msg('Trying to find hash for %s' % path)
findDefer = self.mirrors.findHash(path)
findDefer.addCallbacks(self.findHash_done, self.findHash_error,
- callbackArgs=(path, d), errbackArgs=(path, d))
+ callbackArgs=(req, path, d), errbackArgs=(req, path, d))
findDefer.addErrback(log.err)
return d
- def findHash_error(self, failure, path, d):
+ def findHash_error(self, failure, req, path, d):
log.err(failure)
- self.findHash_done(HashObject(), path, d)
+ self.findHash_done(HashObject(), req, path, d)
- def findHash_done(self, hash, path, d):
+ def findHash_done(self, hash, req, path, d):
if hash.expected() is None:
log.msg('Hash for %s was not found' % path)
self.lookupHash_done([], hash, path, d)
else:
log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
- # Lookup hash from DHT
- key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
- lookupDefer = self.dht.getValue(key)
- lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
+ # Lookup hash in cache
+ locations = self.db.lookupHash(hash.expected())
+ self.getCachedFile(hash, req, path, d, locations)
+
+ def getCachedFile(self, hash, req, path, d, locations):
+ if not locations:
+ log.msg('Failed to return file from cache: %s' % path)
+ self.lookupHash(hash, path, d)
+ return
+
+ # Get the first possible location from the list
+ file = locations.pop(0)['path']
+ log.msg('Returning cached file: %s' % file.path)
+
+ # Get it's response
+ resp = static.File(file.path).renderHTTP(req)
+ if isinstance(resp, defer.Deferred):
+ resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
+ else:
+ self._getCachedFile(resp, hash, req, path, d, locations)
+
+ def _getCachedFile(self, resp, hash, req, path, d, locations):
+ if isinstance(resp, failure.Failure):
+ log.msg('Got error trying to get cached file')
+ log.err()
+ # Try the next possible location
+ self.getCachedFile(hash, req, path, d, locations)
+ return
+
+ log.msg('Cached response: %r' % resp)
+
+ if resp.code >= 200 and resp.code < 400:
+ d.callback(resp)
+ else:
+ # Try the next possible location
+ self.getCachedFile(hash, req, path, d, locations)
+
+ def lookupHash(self, hash, path, d):
+ log.msg('Looking up hash in DHT for file: %s' % path)
+ key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
+ lookupDefer = self.dht.getValue(key)
+ lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
+
def lookupHash_done(self, locations, hash, path, d):
if not locations:
log.msg('Peers for %s were not found' % path)
self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES)
c = self.conn.cursor()
c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, urldir INTEGER, dirlength INTEGER, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)")
+ c.execute("CREATE INDEX files_hash ON files(hash)")
c.execute("CREATE INDEX files_urldir ON files(urldir)")
c.execute("CREATE INDEX files_refreshed ON files(refreshed)")
c.execute("CREATE TABLE dirs (urldir INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT)")
c = self.conn.cursor()
c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (file.path, ))
row = c.fetchone()
- res = self._removeChanged(file, row)
- if res:
- res = {}
- res['hash'] = row['hash']
- res['size'] = row['size']
- res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
+ res = None
+ if row:
+ res = self._removeChanged(file, row)
+ if res:
+ res = {}
+ res['hash'] = row['hash']
+ res['size'] = row['size']
+ res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
c.close()
return res
+ def lookupHash(self, hash):
+ """Find a file by hash in the database.
+
+ If any found files have changed or are missing, they are removed
+ from the database.
+
+ @return: list of dictionaries of info for the found files
+ """
+ c = self.conn.cursor()
+ c.execute("SELECT path, urldir, dirlength, size, mtime FROM files WHERE hash = ? ORDER BY urldir", (khash(hash), ))
+ row = c.fetchone()
+ files = []
+ while row:
+ file = FilePath(row['path'])
+ res = self._removeChanged(file, row)
+ if res:
+ res = {}
+ res['path'] = file
+ res['size'] = row['size']
+ res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
+ files.append(res)
+ row = c.fetchone()
+ c.close()
+ return files
+
def isUnchanged(self, file):
"""Check if a file in the file system has changed.
c = self.conn.cursor()
c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, ))
row = c.fetchone()
- res = self._removeChanged(file, row)
- if res:
- c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path))
+ res = None
+ if row:
+ res = self._removeChanged(file, row)
+ if res:
+ c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path))
return res
def expiredFiles(self, expireAfter):