Lookup the hash in the cache database.
authorCameron Dale <camrdale@gmail.com>
Tue, 15 Jan 2008 07:40:21 +0000 (23:40 -0800)
committerCameron Dale <camrdale@gmail.com>
Tue, 15 Jan 2008 07:40:21 +0000 (23:40 -0800)
If it's found, return that file.

Still need to implement copying the file to the cache if it's
not in there already (compare directory to cache_dir variable).

apt_dht/HTTPServer.py
apt_dht/apt_dht.py
apt_dht/db.py

index 23f911d..616fc2d 100644 (file)
@@ -25,10 +25,10 @@ class FileDownloader(static.File):
         if self.manager:
             path = 'http:/' + req.uri
             if resp.code >= 200 and resp.code < 400:
-                return self.manager.check_freshness(path, resp.headers.getHeader('Last-Modified'), resp)
+                return self.manager.check_freshness(req, path, resp.headers.getHeader('Last-Modified'), resp)
             
             log.msg('Not found, trying other methods for %s' % req.uri)
-            return self.manager.get_resp(path)
+            return self.manager.get_resp(req, path)
         
         return resp
 
index 9504f8a..d212729 100644 (file)
@@ -4,8 +4,8 @@ from urlparse import urlunparse
 import os, re
 
 from twisted.internet import defer
-from twisted.web2 import server, http, http_headers
-from twisted.python import log
+from twisted.web2 import server, http, http_headers, static
+from twisted.python import log, failure
 from twisted.python.filepath import FilePath
 
 from apt_dht_conf import config
@@ -52,46 +52,85 @@ class AptDHT:
         log.err(failure)
         raise RuntimeError, "IP address for this machine could not be found"
     
-    def check_freshness(self, path, modtime, resp):
+    def check_freshness(self, req, path, modtime, resp):
         log.msg('Checking if %s is still fresh' % path)
         d = self.peers.get([path], "HEAD", modtime)
-        d.addCallback(self.check_freshness_done, path, resp)
+        d.addCallback(self.check_freshness_done, req, path, resp)
         return d
     
-    def check_freshness_done(self, resp, path, orig_resp):
+    def check_freshness_done(self, resp, req, path, orig_resp):
         if resp.code == 304:
             log.msg('Still fresh, returning: %s' % path)
             return orig_resp
         else:
             log.msg('Stale, need to redownload: %s' % path)
-            return self.get_resp(path)
+            return self.get_resp(req, path)
     
-    def get_resp(self, path):
+    def get_resp(self, req, path):
         d = defer.Deferred()
         
         log.msg('Trying to find hash for %s' % path)
         findDefer = self.mirrors.findHash(path)
         
         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
-                               callbackArgs=(path, d), errbackArgs=(path, d))
+                               callbackArgs=(req, path, d), errbackArgs=(req, path, d))
         findDefer.addErrback(log.err)
         return d
     
-    def findHash_error(self, failure, path, d):
+    def findHash_error(self, failure, req, path, d):
         log.err(failure)
-        self.findHash_done(HashObject(), path, d)
+        self.findHash_done(HashObject(), req, path, d)
         
-    def findHash_done(self, hash, path, d):
+    def findHash_done(self, hash, req, path, d):
         if hash.expected() is None:
             log.msg('Hash for %s was not found' % path)
             self.lookupHash_done([], hash, path, d)
         else:
             log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
-            # Lookup hash from DHT
-            key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
-            lookupDefer = self.dht.getValue(key)
-            lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
             
+            # Lookup hash in cache
+            locations = self.db.lookupHash(hash.expected())
+            self.getCachedFile(hash, req, path, d, locations)
+
+    def getCachedFile(self, hash, req, path, d, locations):
+        if not locations:
+            log.msg('Failed to return file from cache: %s' % path)
+            self.lookupHash(hash, path, d)
+            return
+        
+        # Get the first possible location from the list
+        file = locations.pop(0)['path']
+        log.msg('Returning cached file: %s' % file.path)
+        
+        # Get it's response
+        resp = static.File(file.path).renderHTTP(req)
+        if isinstance(resp, defer.Deferred):
+            resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
+        else:
+            self._getCachedFile(resp, hash, req, path, d, locations)
+        
+    def _getCachedFile(self, resp, hash, req, path, d, locations):
+        if isinstance(resp, failure.Failure):
+            log.msg('Got error trying to get cached file')
+            log.err()
+            # Try the next possible location
+            self.getCachedFile(hash, req, path, d, locations)
+            return
+            
+        log.msg('Cached response: %r' % resp)
+        
+        if resp.code >= 200 and resp.code < 400:
+            d.callback(resp)
+        else:
+            # Try the next possible location
+            self.getCachedFile(hash, req, path, d, locations)
+
+    def lookupHash(self, hash, path, d):
+        log.msg('Looking up hash in DHT for file: %s' % path)
+        key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
+        lookupDefer = self.dht.getValue(key)
+        lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
+
     def lookupHash_done(self, locations, hash, path, d):
         if not locations:
             log.msg('Peers for %s were not found' % path)
index 1d2e342..1f4d6e3 100644 (file)
@@ -47,6 +47,7 @@ class DB:
         self.conn = sqlite.connect(database=self.db.path, detect_types=sqlite.PARSE_DECLTYPES)
         c = self.conn.cursor()
         c.execute("CREATE TABLE files (path TEXT PRIMARY KEY, hash KHASH, urldir INTEGER, dirlength INTEGER, size NUMBER, mtime NUMBER, refreshed TIMESTAMP)")
+        c.execute("CREATE INDEX files_hash ON files(hash)")
         c.execute("CREATE INDEX files_urldir ON files(urldir)")
         c.execute("CREATE INDEX files_refreshed ON files(refreshed)")
         c.execute("CREATE TABLE dirs (urldir INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT)")
@@ -101,15 +102,42 @@ class DB:
         c = self.conn.cursor()
         c.execute("SELECT hash, urldir, dirlength, size, mtime FROM files WHERE path = ?", (file.path, ))
         row = c.fetchone()
-        res = self._removeChanged(file, row)
-        if res:
-            res = {}
-            res['hash'] = row['hash']
-            res['size'] = row['size']
-            res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
+        res = None
+        if row:
+            res = self._removeChanged(file, row)
+            if res:
+                res = {}
+                res['hash'] = row['hash']
+                res['size'] = row['size']
+                res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
         c.close()
         return res
         
+    def lookupHash(self, hash):
+        """Find a file by hash in the database.
+        
+        If any found files have changed or are missing, they are removed
+        from the database.
+        
+        @return: list of dictionaries of info for the found files
+        """
+        c = self.conn.cursor()
+        c.execute("SELECT path, urldir, dirlength, size, mtime FROM files WHERE hash = ? ORDER BY urldir", (khash(hash), ))
+        row = c.fetchone()
+        files = []
+        while row:
+            file = FilePath(row['path'])
+            res = self._removeChanged(file, row)
+            if res:
+                res = {}
+                res['path'] = file
+                res['size'] = row['size']
+                res['urlpath'] = '/~' + str(row['urldir']) + file.path[row['dirlength']:]
+                files.append(res)
+            row = c.fetchone()
+        c.close()
+        return files
+        
     def isUnchanged(self, file):
         """Check if a file in the file system has changed.
         
@@ -132,9 +160,11 @@ class DB:
         c = self.conn.cursor()
         c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, ))
         row = c.fetchone()
-        res = self._removeChanged(file, row)
-        if res:
-            c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path))
+        res = None
+        if row:
+            res = self._removeChanged(file, row)
+            if res:
+                c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path))
         return res
     
     def expiredFiles(self, expireAfter):