From 73e507f17f01b02ee458cfd75abb955e195aa824 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Mon, 14 Jan 2008 23:40:21 -0800 Subject: [PATCH] Lookup the hash in the cache database. If it's found, return that file. Still need to implement copying the file to the cache if it's not in there already (compare directory to cache_dir variable). --- apt_dht/HTTPServer.py | 4 +-- apt_dht/apt_dht.py | 69 +++++++++++++++++++++++++++++++++---------- apt_dht/db.py | 48 ++++++++++++++++++++++++------ 3 files changed, 95 insertions(+), 26 deletions(-) diff --git a/apt_dht/HTTPServer.py b/apt_dht/HTTPServer.py index 23f911d..616fc2d 100644 --- a/apt_dht/HTTPServer.py +++ b/apt_dht/HTTPServer.py @@ -25,10 +25,10 @@ class FileDownloader(static.File): if self.manager: path = 'http:/' + req.uri if resp.code >= 200 and resp.code < 400: - return self.manager.check_freshness(path, resp.headers.getHeader('Last-Modified'), resp) + return self.manager.check_freshness(req, path, resp.headers.getHeader('Last-Modified'), resp) log.msg('Not found, trying other methods for %s' % req.uri) - return self.manager.get_resp(path) + return self.manager.get_resp(req, path) return resp diff --git a/apt_dht/apt_dht.py b/apt_dht/apt_dht.py index 9504f8a..d212729 100644 --- a/apt_dht/apt_dht.py +++ b/apt_dht/apt_dht.py @@ -4,8 +4,8 @@ from urlparse import urlunparse import os, re from twisted.internet import defer -from twisted.web2 import server, http, http_headers -from twisted.python import log +from twisted.web2 import server, http, http_headers, static +from twisted.python import log, failure from twisted.python.filepath import FilePath from apt_dht_conf import config @@ -52,46 +52,85 @@ class AptDHT: log.err(failure) raise RuntimeError, "IP address for this machine could not be found" - def check_freshness(self, path, modtime, resp): + def check_freshness(self, req, path, modtime, resp): log.msg('Checking if %s is still fresh' % path) d = self.peers.get([path], "HEAD", modtime) - d.addCallback(self.check_freshness_done, path, resp) + d.addCallback(self.check_freshness_done, req, path, resp) return d - def check_freshness_done(self, resp, path, orig_resp): + def check_freshness_done(self, resp, req, path, orig_resp): if resp.code == 304: log.msg('Still fresh, returning: %s' % path) return orig_resp else: log.msg('Stale, need to redownload: %s' % path) - return self.get_resp(path) + return self.get_resp(req, path) - def get_resp(self, path): + def get_resp(self, req, path): d = defer.Deferred() log.msg('Trying to find hash for %s' % path) findDefer = self.mirrors.findHash(path) findDefer.addCallbacks(self.findHash_done, self.findHash_error, - callbackArgs=(path, d), errbackArgs=(path, d)) + callbackArgs=(req, path, d), errbackArgs=(req, path, d)) findDefer.addErrback(log.err) return d - def findHash_error(self, failure, path, d): + def findHash_error(self, failure, req, path, d): log.err(failure) - self.findHash_done(HashObject(), path, d) + self.findHash_done(HashObject(), req, path, d) - def findHash_done(self, hash, path, d): + def findHash_done(self, hash, req, path, d): if hash.expected() is None: log.msg('Hash for %s was not found' % path) self.lookupHash_done([], hash, path, d) else: log.msg('Found hash %s for %s' % (hash.hexexpected(), path)) - # Lookup hash from DHT - key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH')) - lookupDefer = self.dht.getValue(key) - lookupDefer.addCallback(self.lookupHash_done, hash, path, d) + # Lookup hash in cache + locations = self.db.lookupHash(hash.expected()) + self.getCachedFile(hash, req, path, d, locations) + + def getCachedFile(self, hash, req, path, d, locations): + if not locations: + log.msg('Failed to return file from cache: %s' % path) + self.lookupHash(hash, path, d) + return + + # Get the first possible location from the list + file = locations.pop(0)['path'] + log.msg('Returning cached file: %s' % file.path) + + # Get it's response + resp = static.File(file.path).renderHTTP(req) + if isinstance(resp, defer.Deferred): + resp.addBoth(self._getCachedFile, hash, req, path, d, locations) + else: + self._getCachedFile(resp, hash, req, path, d, locations) + + def _getCachedFile(self, resp, hash, req, path, d, locations): + if isinstance(resp, failure.Failure): + log.msg('Got error trying to get cached file') + log.err() + # Try the next possible location + self.getCachedFile(hash, req, path, d, locations) + return + + log.msg('Cached response: %r' % resp) + + if resp.code >= 200 and resp.code < 400: + d.callback(resp) + else: + # Try the next possible location + self.getCachedFile(hash, req, path, d, locations) + + def lookupHash(self, hash, path, d): + log.msg('Looking up hash in DHT for file: %s' % path) + key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH')) + lookupDefer = self.dht.getValue(key) + lookupDefer.addCallback(self.lookupHash_done, hash, path, d) + def lookupHash_done(self, locations, hash, path, d): if not locations: log.msg('Peers for %s were not found' % path) diff --git a/apt_dht/db.py b/apt_dht/db.py index 1d2e342..1f4d6e3 100644 --- a/apt_dht/db.py +++ b/apt_dht/db.py @@ -47,6 +47,7 @@ class DB: self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES) c = self.conn.cursor() c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, urldir INTEGER, dirlength INTEGER, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)") + c.execute("CREATE INDEX files_hash ON files(hash)") c.execute("CREATE INDEX files_urldir ON files(urldir)") c.execute("CREATE INDEX files_refreshed ON files(refreshed)") c.execute("CREATE TABLE dirs (urldir INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT)") @@ -101,15 +102,42 @@ class DB: c = self.conn.cursor() c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (file.path, )) row = c.fetchone() - res = self._removeChanged(file, row) - if res: - res = {} - res['hash'] = row['hash'] - res['size'] = row['size'] - res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:] + res = None + if row: + res = self._removeChanged(file, row) + if res: + res = {} + res['hash'] = row['hash'] + res['size'] = row['size'] + res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:] c.close() return res + def lookupHash(self, hash): + """Find a file by hash in the database. + + If any found files have changed or are missing, they are removed + from the database. + + @return: list of dictionaries of info for the found files + """ + c = self.conn.cursor() + c.execute("SELECT path, urldir, dirlength, size, mtime FROM files WHERE hash = ? ORDER BY urldir", (khash(hash), )) + row = c.fetchone() + files = [] + while row: + file = FilePath(row['path']) + res = self._removeChanged(file, row) + if res: + res = {} + res['path'] = file + res['size'] = row['size'] + res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:] + files.append(res) + row = c.fetchone() + c.close() + return files + def isUnchanged(self, file): """Check if a file in the file system has changed. @@ -132,9 +160,11 @@ class DB: c = self.conn.cursor() c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, )) row = c.fetchone() - res = self._removeChanged(file, row) - if res: - c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path)) + res = None + if row: + res = self._removeChanged(file, row) + if res: + c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path)) return res def expiredFiles(self, expireAfter): -- 2.39.5