Fix some documentation errors.
[quix0rs-apt-p2p.git] / apt_p2p / apt_p2p.py
index 1f04a87da3b97feb1d9e03a0b0ebaf516a16f67b..385e27571226a2c8f32571bac3b39b46b205c95b 100644 (file)
@@ -1,40 +1,31 @@
 
 """The main program code.
 
 
 """The main program code.
 
-@var DHT_PIECES: the maximum number of pieces to store with our contact info
-    in the DHT
-@var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
-    in the DHT
 @var download_dir: the name of the directory to use for downloaded files
 @var download_dir: the name of the directory to use for downloaded files
-
+@var peer_dir: the name of the directory to use for peer downloads
 """
 
 """
 
-from binascii import b2a_hex
-from urlparse import urlunparse
 from urllib import unquote
 from urllib import unquote
-import os, re, sha
 
 
-from twisted.internet import defer, reactor
-from twisted.web2 import server, http, http_headers, static
+from twisted.internet import defer, reactor, protocol
+from twisted.web2 import static
 from twisted.python import log, failure
 from twisted.python.filepath import FilePath
 
 from twisted.python import log, failure
 from twisted.python.filepath import FilePath
 
-from interfaces import IDHT, IDHTStats
 from apt_p2p_conf import config
 from apt_p2p_conf import config
+from DHTManager import DHT
 from PeerManager import PeerManager
 from HTTPServer import TopLevel
 from MirrorManager import MirrorManager
 from CacheManager import CacheManager
 from Hash import HashObject
 from db import DB
 from PeerManager import PeerManager
 from HTTPServer import TopLevel
 from MirrorManager import MirrorManager
 from CacheManager import CacheManager
 from Hash import HashObject
 from db import DB
-from util import findMyIPAddr, compact
-
-DHT_PIECES = 4
-TORRENT_PIECES = 70
+from stats import StatsLogger
 
 download_dir = 'cache'
 
 download_dir = 'cache'
+peer_dir = 'peers'
 
 
-class AptP2P:
+class AptP2P(protocol.Factory):
     """The main code object that does all of the work.
     
     Contains all of the sub-components that do all the low-level work, and
     """The main code object that does all of the work.
     
     Contains all of the sub-components that do all the low-level work, and
@@ -46,8 +37,10 @@ class AptP2P:
     @ivar cache_dir: the directory to use for storing all files
     @type db: L{db.DB}
     @ivar db: the database to use for tracking files and hashes
     @ivar cache_dir: the directory to use for storing all files
     @type db: L{db.DB}
     @ivar db: the database to use for tracking files and hashes
-    @type dht: L{interfaces.IDHT}
-    @ivar dht: the DHT instance
+    @type dht: L{DHTManager.DHT}
+    @ivar dht: the manager for DHT requests
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics logger to record sent data to
     @type http_server: L{HTTPServer.TopLevel}
     @ivar http_server: the web server that will handle all requests from apt
         and from other peers
     @type http_server: L{HTTPServer.TopLevel}
     @ivar http_server: the web server that will handle all requests from apt
         and from other peers
@@ -58,9 +51,8 @@ class AptP2P:
         can be queried to get hashes from file names
     @type cache: L{CacheManager.CacheManager}
     @ivar cache: the manager of all downloaded files
         can be queried to get hashes from file names
     @type cache: L{CacheManager.CacheManager}
     @ivar cache: the manager of all downloaded files
-    @type my_contact: C{string}
-    @ivar my_contact: the 6-byte compact peer representation of this peer's
-        download information (IP address and port)
+    @type my_addr: C{string}, C{int}
+    @ivar my_addr: the IP address and port of this peer
     """
     
     def __init__(self, dhtClass):
     """
     
     def __init__(self, dhtClass):
@@ -71,66 +63,46 @@ class AptP2P:
         """
         log.msg('Initializing the main apt_p2p application')
         self.dhtClass = dhtClass
         """
         log.msg('Initializing the main apt_p2p application')
         self.dhtClass = dhtClass
+        self.my_addr = None
+
+    #{ Factory interface
+    def startFactory(self):
+        reactor.callLater(0, self._startFactory)
+        
+    def _startFactory(self):
+        log.msg('Starting the main apt_p2p application')
         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
         if not self.cache_dir.child(download_dir).exists():
             self.cache_dir.child(download_dir).makedirs()
         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
         if not self.cache_dir.child(download_dir).exists():
             self.cache_dir.child(download_dir).makedirs()
+        if not self.cache_dir.child(peer_dir).exists():
+            self.cache_dir.child(peer_dir).makedirs()
         self.db = DB(self.cache_dir.child('apt-p2p.db'))
         self.db = DB(self.cache_dir.child('apt-p2p.db'))
-        self.dht = dhtClass()
-        self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
-        self.dht.join().addCallbacks(self.joinComplete, self.joinError)
-        self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self,
-                                    config.getint('DEFAULT', 'UPLOAD_LIMIT'))
-        self.getHTTPFactory = self.http_server.getHTTPFactory
-        self.peers = PeerManager(self.cache_dir, self.dht)
-        self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
-        other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
-        self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
-        self.my_contact = None
-
-    #{ DHT maintenance
-    def joinComplete(self, result):
-        """Complete the DHT join process and determine our download information.
-        
-        Called by the DHT when the join has been completed with information
-        on the external IP address and port of this peer.
-        """
-        my_addr = findMyIPAddr(result,
-                               config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
-                               config.getboolean('DEFAULT', 'LOCAL_OK'))
-        if not my_addr:
-            raise RuntimeError, "IP address for this machine could not be found"
-        self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
-        self.cache.scanDirectories()
-        reactor.callLater(60, self.refreshFiles)
-
-    def joinError(self, failure):
-        """Joining the DHT has failed."""
-        log.msg("joining DHT failed miserably")
-        log.err(failure)
-        raise RuntimeError, "IP address for this machine could not be found"
+        self.dht = DHT(self.dhtClass, self.db)
+        df = self.dht.start()
+        df.addCallback(self._dhtStarted)
+        self.stats = StatsLogger(self.db)
+        self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
+        self.http_server.getHTTPFactory().startFactory()
+        self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats)
+        self.mirrors = MirrorManager(self.cache_dir)
+        self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
     
     
-    def refreshFiles(self):
-        """Refresh any files in the DHT that are about to expire."""
-        expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
-        hashes = self.db.expiredHashes(expireAfter)
-        if len(hashes.keys()) > 0:
-            log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
-        self._refreshFiles(None, hashes)
+    def _dhtStarted(self, result):
+        """Save the returned address and start scanning the cache."""
+        self.my_addr = result
+        self.cache.scanDirectories()
         
         
-    def _refreshFiles(self, result, hashes):
-        if result is not None:
-            log.msg('Storage resulted in: %r' % result)
-
-        if hashes:
-            raw_hash = hashes.keys()[0]
-            self.db.refreshHash(raw_hash)
-            hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
-            del hashes[raw_hash]
-            storeDefer = self.store(hash)
-            storeDefer.addBoth(self._refreshFiles, hashes)
-        else:
-            reactor.callLater(60, self.refreshFiles)
+    def stopFactory(self):
+        log.msg('Stoppping the main apt_p2p application')
+        self.http_server.getHTTPFactory().stopFactory()
+        self.mirrors.cleanup()
+        self.stats.save()
+        self.db.close()
     
     
+    def buildProtocol(self, addr):
+        return self.http_server.getHTTPFactory().buildProtocol(addr)
+
+    #{ Other functions
     def getStats(self):
         """Retrieve and format the statistics for the program.
         
     def getStats(self):
         """Retrieve and format the statistics for the program.
         
@@ -138,56 +110,24 @@ class AptP2P:
         @return: the formatted HTML page containing the statistics
         """
         out = '<html><body>\n\n'
         @return: the formatted HTML page containing the statistics
         """
         out = '<html><body>\n\n'
-        if IDHTStats.implementedBy(self.dhtClass):
-            out += self.dht.getStats()
+        out += self.stats.formatHTML(self.my_addr)
+        out += '\n\n'
+        out += self.dht.getStats()
         out += '\n</body></html>\n'
         return out
 
     #{ Main workflow
         out += '\n</body></html>\n'
         return out
 
     #{ Main workflow
-    def check_freshness(self, req, url, modtime, resp):
-        """Send a HEAD to the mirror to check if the response from the cache is still valid.
+    def get_resp(self, req, url, orig_resp = None):
+        """Lookup a hash for the file in the local mirror info.
         
         
-        @type req: L{twisted.web2.http.Request}
-        @param req: the initial request sent to the HTTP server by apt
-        @param url: the URI of the actual mirror request
-        @type modtime: C{int}
-        @param modtime: the modified time of the cached file (seconds since epoch)
-        @type resp: L{twisted.web2.http.Response}
-        @param resp: the response from the cache to be sent to apt
-        @rtype: L{twisted.internet.defer.Deferred}
-        @return: a deferred that will be called back with the correct response
-        """
-        log.msg('Checking if %s is still fresh' % url)
-        d = self.peers.get('', url, method = "HEAD", modtime = modtime)
-        d.addCallback(self.check_freshness_done, req, url, resp)
-        return d
-    
-    def check_freshness_done(self, resp, req, url, orig_resp):
-        """Process the returned response from the mirror.
+        Starts the process of getting a response to an apt request.
         
         
-        @type resp: L{twisted.web2.http.Response}
-        @param resp: the response from the mirror to the HEAD request
         @type req: L{twisted.web2.http.Request}
         @param req: the initial request sent to the HTTP server by apt
         @param url: the URI of the actual mirror request
         @type orig_resp: L{twisted.web2.http.Response}
         @param orig_resp: the response from the cache to be sent to apt
         @type req: L{twisted.web2.http.Request}
         @param req: the initial request sent to the HTTP server by apt
         @param url: the URI of the actual mirror request
         @type orig_resp: L{twisted.web2.http.Response}
         @param orig_resp: the response from the cache to be sent to apt
-        """
-        if resp.code == 304:
-            log.msg('Still fresh, returning: %s' % url)
-            return orig_resp
-        else:
-            log.msg('Stale, need to redownload: %s' % url)
-            return self.get_resp(req, url)
-    
-    def get_resp(self, req, url):
-        """Lookup a hash for the file in the local mirror info.
-        
-        Starts the process of getting a response to an uncached apt request.
-        
-        @type req: L{twisted.web2.http.Request}
-        @param req: the initial request sent to the HTTP server by apt
-        @param url: the URI of the actual mirror request
+            (optional, ignored if missing)
         @rtype: L{twisted.internet.defer.Deferred}
         @return: a deferred that will be called back with the response
         """
         @rtype: L{twisted.internet.defer.Deferred}
         @return: a deferred that will be called back with the response
         """
@@ -197,27 +137,33 @@ class AptP2P:
         findDefer = self.mirrors.findHash(unquote(url))
         
         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
         findDefer = self.mirrors.findHash(unquote(url))
         
         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
-                               callbackArgs=(req, url, d), errbackArgs=(req, url, d))
-        findDefer.addErrback(log.err)
+                               callbackArgs=(req, url, orig_resp, d),
+                               errbackArgs=(req, url, orig_resp, d))
         return d
     
         return d
     
-    def findHash_error(self, failure, req, url, d):
+    def findHash_error(self, failure, req, url, orig_resp, d):
         """Process the error in hash lookup by returning an empty L{HashObject}."""
         """Process the error in hash lookup by returning an empty L{HashObject}."""
-        log.err(failure)
-        self.findHash_done(HashObject(), req, url, d)
+        log.msg('Hash lookup for %s resulted in an error: %s' %
+                (url, failure.getErrorMessage()))
+        self.findHash_done(HashObject(), req, url, orig_resp, d)
         
         
-    def findHash_done(self, hash, req, url, d):
-        """Use the returned hash to lookup  the file in the cache.
+    def findHash_done(self, hash, req, url, orig_resp, d):
+        """Use the returned hash to lookup the file in the cache.
         
         If the hash was not found, the workflow skips down to download from
         
         If the hash was not found, the workflow skips down to download from
-        the mirror (L{lookupHash_done}).
+        the mirror (L{startDownload}), or checks the freshness of an old
+        response if there is one.
         
         @type hash: L{Hash.HashObject}
         @param hash: the hash object containing the expected hash for the file
         """
         if hash.expected() is None:
             log.msg('Hash for %s was not found' % url)
         
         @type hash: L{Hash.HashObject}
         @param hash: the hash object containing the expected hash for the file
         """
         if hash.expected() is None:
             log.msg('Hash for %s was not found' % url)
-            self.lookupHash_done([], hash, url, d)
+            # Send the old response or get a new one
+            if orig_resp:
+                self.check_freshness(req, url, orig_resp, d)
+            else:
+                self.startDownload([], req, hash, url, d)
         else:
             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
             
         else:
             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
             
@@ -225,6 +171,53 @@ class AptP2P:
             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
             self.getCachedFile(hash, req, url, d, locations)
 
             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
             self.getCachedFile(hash, req, url, d, locations)
 
+    def check_freshness(self, req, url, orig_resp, d):
+        """Send a HEAD to the mirror to check if the response from the cache is still valid.
+        
+        @type req: L{twisted.web2.http.Request}
+        @param req: the initial request sent to the HTTP server by apt
+        @param url: the URI of the actual mirror request
+        @type orig_resp: L{twisted.web2.http.Response}
+        @param orig_resp: the response from the cache to be sent to apt
+        """
+        log.msg('Checking if %s is still fresh' % url)
+        modtime = orig_resp.headers.getHeader('Last-Modified')
+        headDefer = self.peers.get(HashObject(), url, method = "HEAD",
+                                   modtime = modtime)
+        headDefer.addCallbacks(self.check_freshness_done,
+                               self.check_freshness_error,
+                               callbackArgs = (req, url, orig_resp, d),
+                               errbackArgs = (req, url, d))
+    
+    def check_freshness_done(self, resp, req, url, orig_resp, d):
+        """Return the fresh response, if stale start to redownload.
+        
+        @type resp: L{twisted.web2.http.Response}
+        @param resp: the response from the mirror to the HEAD request
+        @type req: L{twisted.web2.http.Request}
+        @param req: the initial request sent to the HTTP server by apt
+        @param url: the URI of the actual mirror request
+        @type orig_resp: L{twisted.web2.http.Response}
+        @param orig_resp: the response from the cache to be sent to apt
+        """
+        if resp.code == 304:
+            log.msg('Still fresh, returning: %s' % url)
+            d.callback(orig_resp)
+        else:
+            log.msg('Stale, need to redownload: %s' % url)
+            self.startDownload([], req, HashObject(), url, d)
+    
+    def check_freshness_error(self, err, req, url, d):
+        """Mirror request failed, continue with download.
+        
+        @param err: the response from the mirror to the HEAD request
+        @type req: L{twisted.web2.http.Request}
+        @param req: the initial request sent to the HTTP server by apt
+        @param url: the URI of the actual mirror request
+        """
+        log.err(err)
+        self.startDownload([], req, HashObject(), url, d)
+    
     def getCachedFile(self, hash, req, url, d, locations):
         """Try to return the file from the cache, otherwise move on to a DHT lookup.
         
     def getCachedFile(self, hash, req, url, d, locations):
         """Try to return the file from the cache, otherwise move on to a DHT lookup.
         
@@ -235,7 +228,7 @@ class AptP2P:
         """
         if not locations:
             log.msg('Failed to return file from cache: %s' % url)
         """
         if not locations:
             log.msg('Failed to return file from cache: %s' % url)
-            self.lookupHash(hash, url, d)
+            self.lookupHash(req, hash, url, d)
             return
         
         # Get the first possible location from the list
             return
         
         # Get the first possible location from the list
@@ -253,7 +246,7 @@ class AptP2P:
         """Check the returned response to be sure it is valid."""
         if isinstance(resp, failure.Failure):
             log.msg('Got error trying to get cached file')
         """Check the returned response to be sure it is valid."""
         if isinstance(resp, failure.Failure):
             log.msg('Got error trying to get cached file')
-            log.err()
+            log.err(resp)
             # Try the next possible location
             self.getCachedFile(hash, req, url, d, locations)
             return
             # Try the next possible location
             self.getCachedFile(hash, req, url, d, locations)
             return
@@ -266,14 +259,14 @@ class AptP2P:
             # Try the next possible location
             self.getCachedFile(hash, req, url, d, locations)
 
             # Try the next possible location
             self.getCachedFile(hash, req, url, d, locations)
 
-    def lookupHash(self, hash, url, d):
+    def lookupHash(self, req, hash, url, d):
         """Lookup the hash in the DHT."""
         log.msg('Looking up hash in DHT for file: %s' % url)
         key = hash.expected()
         """Lookup the hash in the DHT."""
         log.msg('Looking up hash in DHT for file: %s' % url)
         key = hash.expected()
-        lookupDefer = self.dht.getValue(key)
-        lookupDefer.addCallback(self.lookupHash_done, hash, url, d)
+        lookupDefer = self.dht.get(key)
+        lookupDefer.addBoth(self.startDownload, req, hash, url, d)
 
 
-    def lookupHash_done(self, values, hash, url, d):
+    def startDownload(self, values, req, hash, url, d):
         """Start the download of the file.
         
         The download will be from peers if the DHT lookup succeeded, or
         """Start the download of the file.
         
         The download will be from peers if the DHT lookup succeeded, or
@@ -283,9 +276,18 @@ class AptP2P:
         @param values: the returned values from the DHT containing peer
             download information
         """
         @param values: the returned values from the DHT containing peer
             download information
         """
-        if not values:
-            log.msg('Peers for %s were not found' % url)
+        # Remove some headers Apt sets in the request
+        req.headers.removeHeader('If-Modified-Since')
+        req.headers.removeHeader('Range')
+        req.headers.removeHeader('If-Range')
+        
+        if not isinstance(values, list) or not values:
+            if not isinstance(values, list):
+                log.msg('DHT lookup for %s failed with error %r' % (url, values))
+            else:
+                log.msg('Peers for %s were not found' % url)
             getDefer = self.peers.get(hash, url)
             getDefer = self.peers.get(hash, url)
+#            getDefer.addErrback(self.final_fallback, hash, url)
             getDefer.addCallback(self.cache.save_file, hash, url)
             getDefer.addErrback(self.cache.save_error, url)
             getDefer.addCallbacks(d.callback, d.errback)
             getDefer.addCallback(self.cache.save_file, hash, url)
             getDefer.addErrback(self.cache.save_error, url)
             getDefer.addCallbacks(d.callback, d.errback)
@@ -303,9 +305,17 @@ class AptP2P:
         if response.code < 200 or response.code >= 300:
             log.msg('Download from peers failed, going to direct download: %s' % url)
             getDefer = self.peers.get(hash, url)
         if response.code < 200 or response.code >= 300:
             log.msg('Download from peers failed, going to direct download: %s' % url)
             getDefer = self.peers.get(hash, url)
+#            getDefer.addErrback(self.final_fallback, hash, url)
             return getDefer
         return response
         
             return getDefer
         return response
         
+    def final_fallback(self, err, hash, url):
+        """Final retry if the mirror still generated an error."""
+        log.msg('Download from mirror failed, retrying once only: %s' % url)
+        log.err(err)
+        getDefer = self.peers.get(hash, url)
+        return getDefer
+        
     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
         """Add a newly cached file to the mirror info and/or the DHT.
         
     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
         """Add a newly cached file to the mirror info and/or the DHT.
         
@@ -332,53 +342,7 @@ class AptP2P:
         if url:
             self.mirrors.updatedFile(url, file_path)
         
         if url:
             self.mirrors.updatedFile(url, file_path)
         
-        if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
-            return self.store(hash)
+        if self.my_addr and hash and new_hash and (hash.expected() is not None or forceDHT):
+            return self.dht.store(hash)
         return None
         return None
-            
-    def store(self, hash):
-        """Add a key/value pair for the file to the DHT.
-        
-        Sets the key and value from the hash information, and tries to add
-        it to the DHT.
-        """
-        key = hash.digest()
-        value = {'c': self.my_contact}
-        pieces = hash.pieceDigests()
-        
-        # Determine how to store any piece data
-        if len(pieces) <= 1:
-            pass
-        elif len(pieces) <= DHT_PIECES:
-            # Short enough to be stored with our peer contact info
-            value['t'] = {'t': ''.join(pieces)}
-        elif len(pieces) <= TORRENT_PIECES:
-            # Short enough to be stored in a separate key in the DHT
-            value['h'] = sha.new(''.join(pieces)).digest()
-        else:
-            # Too long, must be served up by our peer HTTP server
-            value['l'] = sha.new(''.join(pieces)).digest()
-
-        storeDefer = self.dht.storeValue(key, value)
-        storeDefer.addCallback(self.store_done, hash)
-        return storeDefer
-
-    def store_done(self, result, hash):
-        """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
-        log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
-        pieces = hash.pieceDigests()
-        if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
-            # Add the piece data key and value to the DHT
-            key = sha.new(''.join(pieces)).digest()
-            value = {'t': ''.join(pieces)}
-
-            storeDefer = self.dht.storeValue(key, value)
-            storeDefer.addCallback(self.store_torrent_done, key)
-            return storeDefer
-        return result
-
-    def store_torrent_done(self, result, key):
-        """Adding the file to the DHT is complete, and so is the workflow."""
-        log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
-        return result
     
\ No newline at end of file
     
\ No newline at end of file