Refresh DHT values just before they are due to expire.
[quix0rs-apt-p2p.git] / apt_dht / apt_dht.py
index eb103bd34cd2f4eb285fa2c9d07437f50f8e9073..1896494f986bce3463a1e0240cc30e18ac0e76de 100644 (file)
@@ -3,7 +3,7 @@ from binascii import b2a_hex
 from urlparse import urlunparse
 import os, re
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 from twisted.web2 import server, http, http_headers, static
 from twisted.python import log, failure
 from twisted.python.filepath import FilePath
@@ -35,21 +35,38 @@ class AptDHT:
         self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
-        self.my_addr = None
+        self.my_contact = None
     
     def joinComplete(self, result):
-        self.my_addr = findMyIPAddr(result,
-                                    config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
-                                    config.getboolean('DEFAULT', 'LOCAL_OK'))
-        if not self.my_addr:
+        my_addr = findMyIPAddr(result,
+                               config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
+                               config.getboolean('DEFAULT', 'LOCAL_OK'))
+        if not my_addr:
             raise RuntimeError, "IP address for this machine could not be found"
+        self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
         self.cache.scanDirectories()
+        reactor.callLater(60, self.refreshFiles)
 
     def joinError(self, failure):
         log.msg("joining DHT failed miserably")
         log.err(failure)
         raise RuntimeError, "IP address for this machine could not be found"
     
+    def refreshFiles(self):
+        """Refresh any files in the DHT that are about to expire."""
+        expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
+        hashes = self.db.expiredFiles(expireAfter)
+        if len(hashes.keys()) > 0:
+            log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
+        for raw_hash in hashes:
+            self.db.refreshHash(raw_hash)
+            hash = HashObject(raw_hash)
+            key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
+            value = {'c': self.my_contact}
+            storeDefer = self.dht.storeValue(key, value)
+            storeDefer.addCallback(self.store_done, hash)
+        reactor.callLater(60, self.refreshFiles)
+
     def check_freshness(self, req, path, modtime, resp):
         log.msg('Checking if %s is still fresh' % path)
         d = self.peers.get('', path, method = "HEAD", modtime = modtime)
@@ -152,7 +169,7 @@ class AptDHT:
             return getDefer
         return response
         
-    def new_cached_file(self, file_path, hash, url = None, forceDHT = False):
+    def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
         """Add a newly cached file to the DHT.
         
         If the file was downloaded, set url to the path it was downloaded for.
@@ -162,10 +179,9 @@ class AptDHT:
         if url:
             self.mirrors.updatedFile(url, file_path)
         
-        if self.my_addr and hash and (hash.expected() is not None or forceDHT):
-            contact = compact(self.my_addr, config.getint('DEFAULT', 'PORT'))
-            value = {'c': contact}
+        if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
+            value = {'c': self.my_contact}
             storeDefer = self.dht.storeValue(key, value)
             storeDefer.addCallback(self.store_done, hash)
             return storeDefer