]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht/apt_dht.py
Add property tracking to downloads from peers.
[quix0rs-apt-p2p.git] / apt_dht / apt_dht.py
index fd7b73d81ce5ba8053431578491dbc31a5424222..94260bd4c396a4c216e2952c9094ae2a1cdaf0cb 100644 (file)
@@ -1,9 +1,9 @@
 
 from binascii import b2a_hex
 from urlparse import urlunparse
 
 from binascii import b2a_hex
 from urlparse import urlunparse
-import os, re
+import os, re, sha
 
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 from twisted.web2 import server, http, http_headers, static
 from twisted.python import log, failure
 from twisted.python.filepath import FilePath
 from twisted.web2 import server, http, http_headers, static
 from twisted.python import log, failure
 from twisted.python.filepath import FilePath
@@ -15,7 +15,10 @@ from MirrorManager import MirrorManager
 from CacheManager import CacheManager
 from Hash import HashObject
 from db import DB
 from CacheManager import CacheManager
 from Hash import HashObject
 from db import DB
-from util import findMyIPAddr
+from util import findMyIPAddr, compact
+
+DHT_PIECES = 4
+TORRENT_PIECES = 70
 
 download_dir = 'cache'
 
 
 download_dir = 'cache'
 
@@ -32,24 +35,48 @@ class AptDHT:
         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
         self.getHTTPFactory = self.http_server.getHTTPFactory
         self.peers = PeerManager()
         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
         self.getHTTPFactory = self.http_server.getHTTPFactory
         self.peers = PeerManager()
-        self.mirrors = MirrorManager(self.cache_dir)
+        self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
-        self.my_addr = None
+        self.my_contact = None
     
     def joinComplete(self, result):
     
     def joinComplete(self, result):
-        self.my_addr = findMyIPAddr(result,
-                                    config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
-                                    config.getboolean('DEFAULT', 'LOCAL_OK'))
-        if not self.my_addr:
+        my_addr = findMyIPAddr(result,
+                               config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
+                               config.getboolean('DEFAULT', 'LOCAL_OK'))
+        if not my_addr:
             raise RuntimeError, "IP address for this machine could not be found"
             raise RuntimeError, "IP address for this machine could not be found"
+        self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
         self.cache.scanDirectories()
         self.cache.scanDirectories()
+        reactor.callLater(60, self.refreshFiles)
 
     def joinError(self, failure):
         log.msg("joining DHT failed miserably")
         log.err(failure)
         raise RuntimeError, "IP address for this machine could not be found"
     
 
     def joinError(self, failure):
         log.msg("joining DHT failed miserably")
         log.err(failure)
         raise RuntimeError, "IP address for this machine could not be found"
     
+    def refreshFiles(self):
+        """Refresh any files in the DHT that are about to expire."""
+        expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
+        hashes = self.db.expiredHashes(expireAfter)
+        if len(hashes.keys()) > 0:
+            log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
+        self._refreshFiles(None, hashes)
+        
+    def _refreshFiles(self, result, hashes):
+        if result is not None:
+            log.msg('Storage resulted in: %r' % result)
+
+        if hashes:
+            raw_hash = hashes.keys()[0]
+            self.db.refreshHash(raw_hash)
+            hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
+            del hashes[raw_hash]
+            storeDefer = self.store(hash)
+            storeDefer.addBoth(self._refreshFiles, hashes)
+        else:
+            reactor.callLater(60, self.refreshFiles)
+
     def check_freshness(self, req, path, modtime, resp):
         log.msg('Checking if %s is still fresh' % path)
         d = self.peers.get('', path, method = "HEAD", modtime = modtime)
     def check_freshness(self, req, path, modtime, resp):
         log.msg('Checking if %s is still fresh' % path)
         d = self.peers.get('', path, method = "HEAD", modtime = modtime)
@@ -87,7 +114,7 @@ class AptDHT:
             log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
             
             # Lookup hash in cache
             log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
             
             # Lookup hash in cache
-            locations = self.db.lookupHash(hash.expected())
+            locations = self.db.lookupHash(hash.expected(), filesOnly = True)
             self.getCachedFile(hash, req, path, d, locations)
 
     def getCachedFile(self, hash, req, path, d, locations):
             self.getCachedFile(hash, req, path, d, locations)
 
     def getCachedFile(self, hash, req, path, d, locations):
@@ -129,17 +156,17 @@ class AptDHT:
         lookupDefer = self.dht.getValue(key)
         lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
 
         lookupDefer = self.dht.getValue(key)
         lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
 
-    def lookupHash_done(self, locations, hash, path, d):
-        if not locations:
+    def lookupHash_done(self, values, hash, path, d):
+        if not values:
             log.msg('Peers for %s were not found' % path)
             getDefer = self.peers.get(hash, path)
             getDefer.addCallback(self.cache.save_file, hash, path)
             getDefer.addErrback(self.cache.save_error, path)
             getDefer.addCallbacks(d.callback, d.errback)
         else:
             log.msg('Peers for %s were not found' % path)
             getDefer = self.peers.get(hash, path)
             getDefer.addCallback(self.cache.save_file, hash, path)
             getDefer.addErrback(self.cache.save_error, path)
             getDefer.addCallbacks(d.callback, d.errback)
         else:
-            log.msg('Found peers for %s: %r' % (path, locations))
+            log.msg('Found peers for %s: %r' % (path, values))
             # Download from the found peers
             # Download from the found peers
-            getDefer = self.peers.get(hash, path, locations)
+            getDefer = self.peers.get(hash, path, values)
             getDefer.addCallback(self.check_response, hash, path)
             getDefer.addCallback(self.cache.save_file, hash, path)
             getDefer.addErrback(self.cache.save_error, path)
             getDefer.addCallback(self.check_response, hash, path)
             getDefer.addCallback(self.cache.save_file, hash, path)
             getDefer.addErrback(self.cache.save_error, path)
@@ -152,24 +179,52 @@ class AptDHT:
             return getDefer
         return response
         
             return getDefer
         return response
         
-    def new_cached_file(self, file_path, hash, url = None, forceDHT = False):
-        """Add a newly cached file to the DHT.
+    def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
+        """Add a newly cached file to the appropriate places.
         
         If the file was downloaded, set url to the path it was downloaded for.
         
         If the file was downloaded, set url to the path it was downloaded for.
-        Don't add a file to the DHT unless a hash was found for it
-        (but do add it anyway if forceDHT is True).
+        Doesn't add a file to the DHT unless a hash was found for it
+        (but does add it anyway if forceDHT is True).
         """
         if url:
             self.mirrors.updatedFile(url, file_path)
         
         """
         if url:
             self.mirrors.updatedFile(url, file_path)
         
-        if self.my_addr and hash and (hash.expected() is not None or forceDHT):
-            site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))
-            key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
-            storeDefer = self.dht.storeValue(key, site)
-            storeDefer.addCallback(self.store_done, hash)
-            return storeDefer
+        if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
+            return self.store(hash)
         return None
         return None
+            
+    def store(self, hash):
+        """Add a file to the DHT."""
+        key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
+        value = {'c': self.my_contact}
+        pieces = hash.pieceDigests()
+        if len(pieces) <= 1:
+            pass
+        elif len(pieces) <= DHT_PIECES:
+            value['t'] = {'t': ''.join(pieces)}
+        elif len(pieces) <= TORRENT_PIECES:
+            s = sha.new().update(''.join(pieces))
+            value['h'] = s.digest()
+        else:
+            s = sha.new().update(''.join(pieces))
+            value['l'] = s.digest()
+        storeDefer = self.dht.storeValue(key, value)
+        storeDefer.addCallback(self.store_done, hash)
+        return storeDefer
 
     def store_done(self, result, hash):
 
     def store_done(self, result, hash):
-        log.msg('Added %s to the DHT: %r' % (hash, result))
-        
\ No newline at end of file
+        log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
+        pieces = hash.pieceDigests()
+        if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
+            s = sha.new().update(''.join(pieces))
+            key = s.digest()
+            value = {'t': ''.join(pieces)}
+            storeDefer = self.dht.storeValue(key, value)
+            storeDefer.addCallback(self.store_torrent_done, key)
+            return storeDefer
+        return result
+
+    def store_torrent_done(self, result, key):
+        log.msg('Added torrent string %s to the DHT: %r' % (b2ahex(key), result))
+        return result
+    
\ No newline at end of file