Refresh DHT values just before they are due to expire.
authorCameron Dale <camrdale@gmail.com>
Fri, 22 Feb 2008 23:30:57 +0000 (15:30 -0800)
committerCameron Dale <camrdale@gmail.com>
Fri, 22 Feb 2008 23:30:57 +0000 (15:30 -0800)
TODO
apt-dht.conf
apt_dht/CacheManager.py
apt_dht/Hash.py
apt_dht/apt_dht.py
apt_dht/apt_dht_conf.py
apt_dht/db.py
debian/apt-dht.conf.sgml
test.py

diff --git a/TODO b/TODO
index b204ce719843ea8c1f22c6d1381987c38e9b7b63..ce33a7763381df0317bbc3f5e5ed78db22f6a579 100644 (file)
--- a/TODO
+++ b/TODO
@@ -93,10 +93,3 @@ key. Once a querying node has found enough values (or all of them), then
 it would send the "get_value" method to the nodes that have the most
 values. The "get_value" query could also have a new parameter "number",
 which is the maximum number of values to return.
-
-
-Missing Kademlia implementation details are needed.
-
-The current implementation is missing some important features, mostly 
-focussed on storing values:
- - values need to be republished (every hour?)
index 709a0d3f81d59baf5ef20d7b41af36154161a816..89826697a2ec359cb3f2e96fb8e3a5b02086af9f 100644 (file)
@@ -34,6 +34,10 @@ LOCAL_OK = no
 # to reload when a new request arrives.
 UNLOAD_PACKAGES_CACHE = 5m
 
+# Refresh the DHT keys after this much time has passed.
+# This should be a time slightly less than the DHT's KEY_EXPIRE value.
+KEY_REFRESH = 57m
+
 # Which DHT implementation to use.
 # It must be possile to do "from <DHT>.DHT import DHT" to get a class that
 # implements the IDHT interface. There should also be a similarly named
index c70cee257fda9e0b83be04a085ceb41ecd926b16..9bdf75bb5edc5064b5b8a2fce3366ed72b1f3859 100644 (file)
@@ -210,8 +210,8 @@ class CacheManager:
             url = None
             if self.scanning[0] == self.cache_dir:
                 url = 'http:/' + file.path[len(self.cache_dir.path):]
-            self.db.storeFile(file, result.digest())
-            df = self.manager.new_cached_file(file, result, url, True)
+            new_hash = self.db.storeFile(file, result.digest())
+            df = self.manager.new_cached_file(file, result, new_hash, url, True)
             if df is None:
                 reactor.callLater(0, self._scanDirectories, None, walker)
             else:
@@ -273,13 +273,13 @@ class CacheManager:
             else:
                 log.msg('Hashed file to %s: %s' % (hash.hexdigest(), url))
                 
-            self.db.storeFile(destFile, hash.digest())
+            new_hash = self.db.storeFile(destFile, hash.digest())
             log.msg('now avaliable: %s' % (url))
 
             if self.manager:
-                self.manager.new_cached_file(destFile, hash, url)
+                self.manager.new_cached_file(destFile, hash, new_hash, url)
                 if ext:
-                    self.manager.new_cached_file(decFile, None, url[:-len(ext)])
+                    self.manager.new_cached_file(decFile, None, False, url[:-len(ext)])
         else:
             log.msg("Hashes don't match %s != %s: %s" % (hash.hexexpected(), hash.hexdigest(), url))
             destFile.remove()
index 933fdb596693c87fcc0cf17f47e3e94e1e40ea04..d55375f65b1bc2ea1d693ee842dffc2c0d2f2905 100644 (file)
@@ -34,14 +34,15 @@ class HashObject:
                    },
             ]
     
-    def __init__(self):
+    def __init__(self, digest = None, size = None):
         self.hashTypeNum = 0    # Use the first if nothing else matters
         self.expHash = None
         self.expHex = None
         self.expSize = None
         self.expNormHash = None
         self.fileHasher = None
-        self.fileHash = None
+        self.fileHash = digest
+        self.size = size
         self.fileHex = None
         self.fileNormHash = None
         self.done = True
index eb103bd34cd2f4eb285fa2c9d07437f50f8e9073..1896494f986bce3463a1e0240cc30e18ac0e76de 100644 (file)
@@ -3,7 +3,7 @@ from binascii import b2a_hex
 from urlparse import urlunparse
 import os, re
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 from twisted.web2 import server, http, http_headers, static
 from twisted.python import log, failure
 from twisted.python.filepath import FilePath
@@ -35,21 +35,38 @@ class AptDHT:
         self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
-        self.my_addr = None
+        self.my_contact = None
     
     def joinComplete(self, result):
-        self.my_addr = findMyIPAddr(result,
-                                    config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
-                                    config.getboolean('DEFAULT', 'LOCAL_OK'))
-        if not self.my_addr:
+        my_addr = findMyIPAddr(result,
+                               config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
+                               config.getboolean('DEFAULT', 'LOCAL_OK'))
+        if not my_addr:
             raise RuntimeError, "IP address for this machine could not be found"
+        self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
         self.cache.scanDirectories()
+        reactor.callLater(60, self.refreshFiles)
 
     def joinError(self, failure):
         log.msg("joining DHT failed miserably")
         log.err(failure)
         raise RuntimeError, "IP address for this machine could not be found"
     
+    def refreshFiles(self):
+        """Refresh any files in the DHT that are about to expire."""
+        expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
+        hashes = self.db.expiredFiles(expireAfter)
+        if len(hashes.keys()) > 0:
+            log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
+        for raw_hash in hashes:
+            self.db.refreshHash(raw_hash)
+            hash = HashObject(raw_hash)
+            key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
+            value = {'c': self.my_contact}
+            storeDefer = self.dht.storeValue(key, value)
+            storeDefer.addCallback(self.store_done, hash)
+        reactor.callLater(60, self.refreshFiles)
+
     def check_freshness(self, req, path, modtime, resp):
         log.msg('Checking if %s is still fresh' % path)
         d = self.peers.get('', path, method = "HEAD", modtime = modtime)
@@ -152,7 +169,7 @@ class AptDHT:
             return getDefer
         return response
         
-    def new_cached_file(self, file_path, hash, url = None, forceDHT = False):
+    def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
         """Add a newly cached file to the DHT.
         
         If the file was downloaded, set url to the path it was downloaded for.
@@ -162,10 +179,9 @@ class AptDHT:
         if url:
             self.mirrors.updatedFile(url, file_path)
         
-        if self.my_addr and hash and (hash.expected() is not None or forceDHT):
-            contact = compact(self.my_addr, config.getint('DEFAULT', 'PORT'))
-            value = {'c': contact}
+        if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
+            value = {'c': self.my_contact}
             storeDefer = self.dht.storeValue(key, value)
             storeDefer.addCallback(self.store_done, hash)
             return storeDefer
index 931d46cdd5a1a7fb48308c2dbac0ea22dc00e9c0..3fad6259f43cb55fc5367d68d31b7936ea017a6a 100644 (file)
@@ -43,6 +43,10 @@ DEFAULTS = {
     # to reload when a new request arrives.
     'UNLOAD_PACKAGES_CACHE': '5m',
 
+    # Refresh the DHT keys after this much time has passed.
+    # This should be a time slightly less than the DHT's KEY_EXPIRE value.
+    'KEY_REFRESH': '57m',
+
     # Which DHT implementation to use.
     # It must be possile to do "from <DHT>.DHT import DHT" to get a class that
     # implements the IDHT interface.
index 7b225a19391fe7591c88a6e4a756ef894c3a7112..1ab2b37cac96df1bc8d6a7331326ef5b1d252dfe 100644 (file)
@@ -66,20 +66,36 @@ class DB:
         return res
         
     def storeFile(self, file, hash):
-        """Store or update a file in the database."""
+        """Store or update a file in the database.
+        
+        @return: True if the hash was not in the database before
+            (so it needs to be added to the DHT)
+        """
+        new_hash = True
+        refreshTime = datetime.now()
+        c = self.conn.cursor()
+        c.execute("SELECT MAX(refreshed) AS max_refresh FROM files WHERE hash = ?", (khash(hash), ))
+        row = c.fetchone()
+        if row and row['max_refresh']:
+            new_hash = False
+            refreshTime = row['max_refresh']
+        c.close()
+        
         file.restat()
         c = self.conn.cursor()
         c.execute("SELECT path FROM files WHERE path = ?", (file.path, ))
         row = c.fetchone()
         if row:
             c.execute("UPDATE files SET hash = ?, size = ?, mtime = ?, refreshed = ?", 
-                      (khash(hash), file.getsize(), file.getmtime(), datetime.now()))
+                      (khash(hash), file.getsize(), file.getmtime(), refreshTime))
         else:
             c.execute("INSERT OR REPLACE INTO files VALUES(?, ?, ?, ?, ?)",
-                      (file.path, khash(hash), file.getsize(), file.getmtime(), datetime.now()))
+                      (file.path, khash(hash), file.getsize(), file.getmtime(), refreshTime))
         self.conn.commit()
         c.close()
         
+        return new_hash
+        
     def getFile(self, file):
         """Get a file from the database.
         
@@ -110,7 +126,7 @@ class DB:
         @return: list of dictionaries of info for the found files
         """
         c = self.conn.cursor()
-        c.execute("SELECT path, size, mtime FROM files WHERE hash = ?", (khash(hash), ))
+        c.execute("SELECT path, size, mtime, refreshed FROM files WHERE hash = ?", (khash(hash), ))
         row = c.fetchone()
         files = []
         while row:
@@ -120,6 +136,7 @@ class DB:
                 res = {}
                 res['path'] = file
                 res['size'] = row['size']
+                res['refreshed'] = row['refreshed']
                 files.append(res)
             row = c.fetchone()
         c.close()
@@ -137,41 +154,47 @@ class DB:
         row = c.fetchone()
         return self._removeChanged(file, row)
 
-    def refreshFile(self, file):
-        """Refresh the publishing time of a file.
-        
-        If it has changed or is missing, it is removed from the table.
-        
-        @return: True if unchanged, False if changed, None if not in database
-        """
+    def refreshHash(self, hash):
+        """Refresh the publishing time all files with a hash."""
+        refreshTime = datetime.now()
         c = self.conn.cursor()
-        c.execute("SELECT size, mtime FROM files WHERE path = ?", (file.path, ))
-        row = c.fetchone()
-        res = None
-        if row:
-            res = self._removeChanged(file, row)
-            if res:
-                c.execute("UPDATE files SET refreshed = ? WHERE path = ?", (datetime.now(), file.path))
-        return res
+        c.execute("UPDATE files SET refreshed = ? WHERE hash = ?", (refreshTime, khash(hash)))
+        c.close()
     
     def expiredFiles(self, expireAfter):
         """Find files that need refreshing after expireAfter seconds.
         
-        Also removes any entries from the table that no longer exist.
+        For each hash that needs refreshing, finds all the files with that hash.
+        If the file has changed or is missing, it is removed from the table.
         
         @return: dictionary with keys the hashes, values a list of FilePaths
         """
         t = datetime.now() - timedelta(seconds=expireAfter)
+        
+        # First find the hashes that need refreshing
         c = self.conn.cursor()
-        c.execute("SELECT path, hash, size, mtime FROM files WHERE refreshed < ?", (t, ))
+        c.execute("SELECT DISTINCT hash FROM files WHERE refreshed < ?", (t, ))
         row = c.fetchone()
         expired = {}
         while row:
-            res = self._removeChanged(FilePath(row['path']), row)
-            if res:
-                expired.setdefault(row['hash'], []).append(FilePath(row['path']))
+            expired.setdefault(row['hash'], [])
             row = c.fetchone()
         c.close()
+
+        # Now find the files for each hash
+        for hash in expired.keys():
+            c = self.conn.cursor()
+            c.execute("SELECT path, size, mtime FROM files WHERE hash = ?", (khash(hash), ))
+            row = c.fetchone()
+            while row:
+                res = self._removeChanged(FilePath(row['path']), row)
+                if res:
+                    expired[hash].append(FilePath(row['path']))
+                row = c.fetchone()
+            if len(expired[hash]) == 0:
+                del expired[hash]
+            c.close()
+        
         return expired
         
     def removeUntrackedFiles(self, dirs):
@@ -239,6 +262,12 @@ class TestDB(unittest.TestCase):
         self.failUnless(res)
         self.failUnlessEqual(res['hash'], self.hash)
         
+    def test_lookupHash(self):
+        res = self.store.lookupHash(self.hash)
+        self.failUnless(res)
+        self.failUnlessEqual(len(res), 1)
+        self.failUnlessEqual(res[0]['path'].path, self.file.path)
+        
     def test_isUnchanged(self):
         res = self.store.isUnchanged(self.file)
         self.failUnless(res)
@@ -258,8 +287,7 @@ class TestDB(unittest.TestCase):
         self.failUnlessEqual(len(res.keys()), 1)
         self.failUnlessEqual(res.keys()[0], self.hash)
         self.failUnlessEqual(len(res[self.hash]), 1)
-        res = self.store.refreshFile(self.file)
-        self.failUnless(res)
+        self.store.refreshHash(self.hash)
         res = self.store.expiredFiles(1)
         self.failUnlessEqual(len(res.keys()), 0)
         
@@ -272,6 +300,25 @@ class TestDB(unittest.TestCase):
             file.touch()
             self.store.storeFile(file, self.hash)
     
+    def test_multipleHashes(self):
+        self.build_dirs()
+        res = self.store.expiredFiles(1)
+        self.failUnlessEqual(len(res.keys()), 0)
+        res = self.store.lookupHash(self.hash)
+        self.failUnless(res)
+        self.failUnlessEqual(len(res), 4)
+        self.failUnlessEqual(res[0]['refreshed'], res[1]['refreshed'])
+        self.failUnlessEqual(res[0]['refreshed'], res[2]['refreshed'])
+        self.failUnlessEqual(res[0]['refreshed'], res[3]['refreshed'])
+        sleep(2)
+        res = self.store.expiredFiles(1)
+        self.failUnlessEqual(len(res.keys()), 1)
+        self.failUnlessEqual(res.keys()[0], self.hash)
+        self.failUnlessEqual(len(res[self.hash]), 4)
+        self.store.refreshHash(self.hash)
+        res = self.store.expiredFiles(1)
+        self.failUnlessEqual(len(res.keys()), 0)
+    
     def test_removeUntracked(self):
         self.build_dirs()
         res = self.store.removeUntrackedFiles(self.dirs)
index b481193f9475a0cfc3e7d241e4891ea458acff0a..6ded80c8f808485a13779ef8f48e74db99e553d2 100644 (file)
                  to reload when a new request arrives. (Default is 5 minutes.)</para>
            </listitem>
          </varlistentry>
+         <varlistentry>
+           <term><option>KEY_REFRESH = <replaceable>time</replaceable></option></term>
+            <listitem>
+             <para>The <replaceable>time</replaceable> after which to refresh DHT keys.
+                 This should be a time slightly less than the DHT's KEY_EXPIRE value.
+                 (Default is 57 minutes.)</para>
+           </listitem>
+         </varlistentry>
          <varlistentry>
            <term><option>DHT = <replaceable>string</replaceable></option></term>
             <listitem>
diff --git a/test.py b/test.py
index 66f0fc6598d63657eddfadd674a1c7d92fda258d..5468a1d28e62bd65277ef5494cdc35dfb1b81ae5 100755 (executable)
--- a/test.py
+++ b/test.py
@@ -337,6 +337,10 @@ LOCAL_OK = yes
 # to reload when a new request arrives.
 UNLOAD_PACKAGES_CACHE = 5m
 
+# Refresh the DHT keys after this much time has passed.
+# This should be a time slightly less than the DHT's KEY_EXPIRE value.
+KEY_REFRESH = 57m
+
 # Which DHT implementation to use.
 # It must be possile to do "from <DHT>.DHT import DHT" to get a class that
 # implements the IDHT interface.