]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p/apt_p2p.py
Remove temporary files from the peer's download cache.
[quix0rs-apt-p2p.git] / apt_p2p / apt_p2p.py
index 7a5f9fa604af37439abb6dedc4a0ea481d9240bd..8bfe928cb0b8f442457c15a853a7d4560e21d209 100644 (file)
@@ -6,14 +6,15 @@
 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
     in the DHT
 @var download_dir: the name of the directory to use for downloaded files
 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
     in the DHT
 @var download_dir: the name of the directory to use for downloaded files
-
+@var peer_dir: the name of the directory to use for peer downloads
 """
 
 from binascii import b2a_hex
 from urlparse import urlunparse
 """
 
 from binascii import b2a_hex
 from urlparse import urlunparse
+from urllib import unquote
 import os, re, sha
 
 import os, re, sha
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer, reactor, protocol
 from twisted.web2 import server, http, http_headers, static
 from twisted.python import log, failure
 from twisted.python.filepath import FilePath
 from twisted.web2 import server, http, http_headers, static
 from twisted.python import log, failure
 from twisted.python.filepath import FilePath
@@ -26,14 +27,16 @@ from MirrorManager import MirrorManager
 from CacheManager import CacheManager
 from Hash import HashObject
 from db import DB
 from CacheManager import CacheManager
 from Hash import HashObject
 from db import DB
+from stats import StatsLogger
 from util import findMyIPAddr, compact
 
 DHT_PIECES = 4
 TORRENT_PIECES = 70
 
 download_dir = 'cache'
 from util import findMyIPAddr, compact
 
 DHT_PIECES = 4
 TORRENT_PIECES = 70
 
 download_dir = 'cache'
+peer_dir = 'peers'
 
 
-class AptP2P:
+class AptP2P(protocol.Factory):
     """The main code object that does all of the work.
     
     Contains all of the sub-components that do all the low-level work, and
     """The main code object that does all of the work.
     
     Contains all of the sub-components that do all the low-level work, and
@@ -47,6 +50,8 @@ class AptP2P:
     @ivar db: the database to use for tracking files and hashes
     @type dht: L{interfaces.IDHT}
     @ivar dht: the DHT instance
     @ivar db: the database to use for tracking files and hashes
     @type dht: L{interfaces.IDHT}
     @ivar dht: the DHT instance
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics logger to record sent data to
     @type http_server: L{HTTPServer.TopLevel}
     @ivar http_server: the web server that will handle all requests from apt
         and from other peers
     @type http_server: L{HTTPServer.TopLevel}
     @ivar http_server: the web server that will handle all requests from apt
         and from other peers
@@ -70,23 +75,41 @@ class AptP2P:
         """
         log.msg('Initializing the main apt_p2p application')
         self.dhtClass = dhtClass
         """
         log.msg('Initializing the main apt_p2p application')
         self.dhtClass = dhtClass
+
+    #{ Factory interface
+    def startFactory(self):
+        reactor.callLater(0, self._startFactory)
+        
+    def _startFactory(self):
+        log.msg('Starting the main apt_p2p application')
         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
         if not self.cache_dir.child(download_dir).exists():
             self.cache_dir.child(download_dir).makedirs()
         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
         if not self.cache_dir.child(download_dir).exists():
             self.cache_dir.child(download_dir).makedirs()
+        if not self.cache_dir.child(peer_dir).exists():
+            self.cache_dir.child(peer_dir).makedirs()
         self.db = DB(self.cache_dir.child('apt-p2p.db'))
         self.db = DB(self.cache_dir.child('apt-p2p.db'))
-        self.dht = dhtClass()
+        self.dht = self.dhtClass()
         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
-        self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self,
-                                    config.getint('DEFAULT', 'UPLOAD_LIMIT'))
-        self.getHTTPFactory = self.http_server.getHTTPFactory
-        self.peers = PeerManager(self.cache_dir, self.dht)
-        self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
-        other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
-        self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
+        self.stats = StatsLogger(self.db)
+        self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
+        self.http_server.getHTTPFactory().startFactory()
+        self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats)
+        self.mirrors = MirrorManager(self.cache_dir)
+        self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
         self.my_contact = None
         self.my_contact = None
-
-    #{ DHT maintenance
+        
+    def stopFactory(self):
+        log.msg('Stoppping the main apt_p2p application')
+        self.http_server.getHTTPFactory().stopFactory()
+        self.mirrors.cleanup()
+        self.stats.save()
+        self.db.close()
+    
+    def buildProtocol(self, addr):
+        return self.http_server.getHTTPFactory().buildProtocol(addr)
+        
+    #{ DHT Maintenance
     def joinComplete(self, result):
         """Complete the DHT join process and determine our download information.
         
     def joinComplete(self, result):
         """Complete the DHT join process and determine our download information.
         
@@ -100,7 +123,7 @@ class AptP2P:
             raise RuntimeError, "IP address for this machine could not be found"
         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
         self.cache.scanDirectories()
             raise RuntimeError, "IP address for this machine could not be found"
         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
         self.cache.scanDirectories()
-        reactor.callLater(60, self.refreshFiles)
+        self.nextRefresh = reactor.callLater(60, self.refreshFiles)
 
     def joinError(self, failure):
         """Joining the DHT has failed."""
 
     def joinError(self, failure):
         """Joining the DHT has failed."""
@@ -108,27 +131,31 @@ class AptP2P:
         log.err(failure)
         raise RuntimeError, "IP address for this machine could not be found"
     
         log.err(failure)
         raise RuntimeError, "IP address for this machine could not be found"
     
-    def refreshFiles(self):
+    def refreshFiles(self, result = None, hashes = {}):
         """Refresh any files in the DHT that are about to expire."""
         """Refresh any files in the DHT that are about to expire."""
-        expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
-        hashes = self.db.expiredHashes(expireAfter)
-        if len(hashes.keys()) > 0:
-            log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
-        self._refreshFiles(None, hashes)
-        
-    def _refreshFiles(self, result, hashes):
         if result is not None:
             log.msg('Storage resulted in: %r' % result)
 
         if result is not None:
             log.msg('Storage resulted in: %r' % result)
 
+        if not hashes:
+            expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
+            hashes = self.db.expiredHashes(expireAfter)
+            if len(hashes.keys()) > 0:
+                log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
+
+        delay = 60
         if hashes:
         if hashes:
+            delay = 3
             raw_hash = hashes.keys()[0]
             self.db.refreshHash(raw_hash)
             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
             del hashes[raw_hash]
             storeDefer = self.store(hash)
             raw_hash = hashes.keys()[0]
             self.db.refreshHash(raw_hash)
             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
             del hashes[raw_hash]
             storeDefer = self.store(hash)
-            storeDefer.addBoth(self._refreshFiles, hashes)
+            storeDefer.addBoth(self.refreshFiles, hashes)
+
+        if self.nextRefresh.active():
+            self.nextRefresh.reset(delay)
         else:
         else:
-            reactor.callLater(60, self.refreshFiles)
+            self.nextRefresh = reactor.callLater(delay, self.plRefresh, None, hashes)
     
     def getStats(self):
         """Retrieve and format the statistics for the program.
     
     def getStats(self):
         """Retrieve and format the statistics for the program.
@@ -137,86 +164,61 @@ class AptP2P:
         @return: the formatted HTML page containing the statistics
         """
         out = '<html><body>\n\n'
         @return: the formatted HTML page containing the statistics
         """
         out = '<html><body>\n\n'
+        out += self.stats.formatHTML(self.my_contact)
+        out += '\n\n'
         if IDHTStats.implementedBy(self.dhtClass):
             out += self.dht.getStats()
         out += '\n</body></html>\n'
         return out
 
     #{ Main workflow
         if IDHTStats.implementedBy(self.dhtClass):
             out += self.dht.getStats()
         out += '\n</body></html>\n'
         return out
 
     #{ Main workflow
-    def check_freshness(self, req, url, modtime, resp):
-        """Send a HEAD to the mirror to check if the response from the cache is still valid.
+    def get_resp(self, req, url, orig_resp = None):
+        """Lookup a hash for the file in the local mirror info.
         
         
-        @type req: L{twisted.web2.http.Request}
-        @param req: the initial request sent to the HTTP server by apt
-        @param url: the URI of the actual mirror request
-        @type modtime: C{int}
-        @param modtime: the modified time of the cached file (seconds since epoch)
-        @type resp: L{twisted.web2.http.Response}
-        @param resp: the response from the cache to be sent to apt
-        @rtype: L{twisted.internet.defer.Deferred}
-        @return: a deferred that will be called back with the correct response
-        """
-        log.msg('Checking if %s is still fresh' % url)
-        d = self.peers.get('', url, method = "HEAD", modtime = modtime)
-        d.addCallback(self.check_freshness_done, req, url, resp)
-        return d
-    
-    def check_freshness_done(self, resp, req, url, orig_resp):
-        """Process the returned response from the mirror.
+        Starts the process of getting a response to an apt request.
         
         
-        @type resp: L{twisted.web2.http.Response}
-        @param resp: the response from the mirror to the HEAD request
         @type req: L{twisted.web2.http.Request}
         @param req: the initial request sent to the HTTP server by apt
         @param url: the URI of the actual mirror request
         @type orig_resp: L{twisted.web2.http.Response}
         @param orig_resp: the response from the cache to be sent to apt
         @type req: L{twisted.web2.http.Request}
         @param req: the initial request sent to the HTTP server by apt
         @param url: the URI of the actual mirror request
         @type orig_resp: L{twisted.web2.http.Response}
         @param orig_resp: the response from the cache to be sent to apt
-        """
-        if resp.code == 304:
-            log.msg('Still fresh, returning: %s' % url)
-            return orig_resp
-        else:
-            log.msg('Stale, need to redownload: %s' % url)
-            return self.get_resp(req, url)
-    
-    def get_resp(self, req, url):
-        """Lookup a hash for the file in the local mirror info.
-        
-        Starts the process of getting a response to an uncached apt request.
-        
-        @type req: L{twisted.web2.http.Request}
-        @param req: the initial request sent to the HTTP server by apt
-        @param url: the URI of the actual mirror request
+            (optional, ignored if missing)
         @rtype: L{twisted.internet.defer.Deferred}
         @return: a deferred that will be called back with the response
         """
         d = defer.Deferred()
         
         log.msg('Trying to find hash for %s' % url)
         @rtype: L{twisted.internet.defer.Deferred}
         @return: a deferred that will be called back with the response
         """
         d = defer.Deferred()
         
         log.msg('Trying to find hash for %s' % url)
-        findDefer = self.mirrors.findHash(url)
+        findDefer = self.mirrors.findHash(unquote(url))
         
         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
         
         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
-                               callbackArgs=(req, url, d), errbackArgs=(req, url, d))
-        findDefer.addErrback(log.err)
+                               callbackArgs=(req, url, orig_resp, d),
+                               errbackArgs=(req, url, orig_resp, d))
         return d
     
         return d
     
-    def findHash_error(self, failure, req, url, d):
+    def findHash_error(self, failure, req, url, orig_resp, d):
         """Process the error in hash lookup by returning an empty L{HashObject}."""
         """Process the error in hash lookup by returning an empty L{HashObject}."""
-        log.err(failure)
-        self.findHash_done(HashObject(), req, url, d)
+        log.msg('Hash lookup for %s resulted in an error: %s' %
+                (url, failure.getErrorMessage()))
+        self.findHash_done(HashObject(), req, url, orig_resp, d)
         
         
-    def findHash_done(self, hash, req, url, d):
-        """Use the returned hash to lookup  the file in the cache.
+    def findHash_done(self, hash, req, url, orig_resp, d):
+        """Use the returned hash to lookup the file in the cache.
         
         If the hash was not found, the workflow skips down to download from
         
         If the hash was not found, the workflow skips down to download from
-        the mirror (L{lookupHash_done}).
+        the mirror (L{startDownload}), or checks the freshness of an old
+        response if there is one.
         
         @type hash: L{Hash.HashObject}
         @param hash: the hash object containing the expected hash for the file
         """
         if hash.expected() is None:
             log.msg('Hash for %s was not found' % url)
         
         @type hash: L{Hash.HashObject}
         @param hash: the hash object containing the expected hash for the file
         """
         if hash.expected() is None:
             log.msg('Hash for %s was not found' % url)
-            self.lookupHash_done([], hash, url, d)
+            # Send the old response or get a new one
+            if orig_resp:
+                self.check_freshness(req, url, orig_resp, d)
+            else:
+                self.startDownload([], req, hash, url, d)
         else:
             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
             
         else:
             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
             
@@ -224,6 +226,55 @@ class AptP2P:
             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
             self.getCachedFile(hash, req, url, d, locations)
 
             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
             self.getCachedFile(hash, req, url, d, locations)
 
+    def check_freshness(self, req, url, orig_resp, d):
+        """Send a HEAD to the mirror to check if the response from the cache is still valid.
+        
+        @type req: L{twisted.web2.http.Request}
+        @param req: the initial request sent to the HTTP server by apt
+        @param url: the URI of the actual mirror request
+        @type orig_resp: L{twisted.web2.http.Response}
+        @param orig_resp: the response from the cache to be sent to apt
+        @rtype: L{twisted.internet.defer.Deferred}
+        @return: a deferred that will be called back with the correct response
+        """
+        log.msg('Checking if %s is still fresh' % url)
+        modtime = orig_resp.headers.getHeader('Last-Modified')
+        headDefer = self.peers.get(HashObject(), url, method = "HEAD",
+                                   modtime = modtime)
+        headDefer.addCallbacks(self.check_freshness_done,
+                               self.check_freshness_error,
+                               callbackArgs = (req, url, orig_resp, d),
+                               errbackArgs = (req, url, d))
+    
+    def check_freshness_done(self, resp, req, url, orig_resp, d):
+        """Return the fresh response, if stale start to redownload.
+        
+        @type resp: L{twisted.web2.http.Response}
+        @param resp: the response from the mirror to the HEAD request
+        @type req: L{twisted.web2.http.Request}
+        @param req: the initial request sent to the HTTP server by apt
+        @param url: the URI of the actual mirror request
+        @type orig_resp: L{twisted.web2.http.Response}
+        @param orig_resp: the response from the cache to be sent to apt
+        """
+        if resp.code == 304:
+            log.msg('Still fresh, returning: %s' % url)
+            d.callback(orig_resp)
+        else:
+            log.msg('Stale, need to redownload: %s' % url)
+            self.startDownload([], req, HashObject(), url, d)
+    
+    def check_freshness_error(self, err, req, url, d):
+        """Mirror request failed, continue with download.
+        
+        @param err: the response from the mirror to the HEAD request
+        @type req: L{twisted.web2.http.Request}
+        @param req: the initial request sent to the HTTP server by apt
+        @param url: the URI of the actual mirror request
+        """
+        log.err(err)
+        self.startDownload([], req, HashObject(), url, d)
+    
     def getCachedFile(self, hash, req, url, d, locations):
         """Try to return the file from the cache, otherwise move on to a DHT lookup.
         
     def getCachedFile(self, hash, req, url, d, locations):
         """Try to return the file from the cache, otherwise move on to a DHT lookup.
         
@@ -234,7 +285,7 @@ class AptP2P:
         """
         if not locations:
             log.msg('Failed to return file from cache: %s' % url)
         """
         if not locations:
             log.msg('Failed to return file from cache: %s' % url)
-            self.lookupHash(hash, url, d)
+            self.lookupHash(req, hash, url, d)
             return
         
         # Get the first possible location from the list
             return
         
         # Get the first possible location from the list
@@ -252,7 +303,7 @@ class AptP2P:
         """Check the returned response to be sure it is valid."""
         if isinstance(resp, failure.Failure):
             log.msg('Got error trying to get cached file')
         """Check the returned response to be sure it is valid."""
         if isinstance(resp, failure.Failure):
             log.msg('Got error trying to get cached file')
-            log.err()
+            log.err(resp)
             # Try the next possible location
             self.getCachedFile(hash, req, url, d, locations)
             return
             # Try the next possible location
             self.getCachedFile(hash, req, url, d, locations)
             return
@@ -265,14 +316,14 @@ class AptP2P:
             # Try the next possible location
             self.getCachedFile(hash, req, url, d, locations)
 
             # Try the next possible location
             self.getCachedFile(hash, req, url, d, locations)
 
-    def lookupHash(self, hash, url, d):
+    def lookupHash(self, req, hash, url, d):
         """Lookup the hash in the DHT."""
         log.msg('Looking up hash in DHT for file: %s' % url)
         key = hash.expected()
         lookupDefer = self.dht.getValue(key)
         """Lookup the hash in the DHT."""
         log.msg('Looking up hash in DHT for file: %s' % url)
         key = hash.expected()
         lookupDefer = self.dht.getValue(key)
-        lookupDefer.addCallback(self.lookupHash_done, hash, url, d)
+        lookupDefer.addBoth(self.startDownload, req, hash, url, d)
 
 
-    def lookupHash_done(self, values, hash, url, d):
+    def startDownload(self, values, req, hash, url, d):
         """Start the download of the file.
         
         The download will be from peers if the DHT lookup succeeded, or
         """Start the download of the file.
         
         The download will be from peers if the DHT lookup succeeded, or
@@ -282,8 +333,16 @@ class AptP2P:
         @param values: the returned values from the DHT containing peer
             download information
         """
         @param values: the returned values from the DHT containing peer
             download information
         """
-        if not values:
-            log.msg('Peers for %s were not found' % url)
+        # Remove some headers Apt sets in the request
+        req.headers.removeHeader('If-Modified-Since')
+        req.headers.removeHeader('Range')
+        req.headers.removeHeader('If-Range')
+        
+        if not isinstance(values, list) or not values:
+            if not isinstance(values, list):
+                log.msg('DHT lookup for %s failed with error %r' % (url, values))
+            else:
+                log.msg('Peers for %s were not found' % url)
             getDefer = self.peers.get(hash, url)
             getDefer.addCallback(self.cache.save_file, hash, url)
             getDefer.addErrback(self.cache.save_error, url)
             getDefer = self.peers.get(hash, url)
             getDefer.addCallback(self.cache.save_file, hash, url)
             getDefer.addErrback(self.cache.save_error, url)
@@ -359,7 +418,8 @@ class AptP2P:
             value['l'] = sha.new(''.join(pieces)).digest()
 
         storeDefer = self.dht.storeValue(key, value)
             value['l'] = sha.new(''.join(pieces)).digest()
 
         storeDefer = self.dht.storeValue(key, value)
-        storeDefer.addCallback(self.store_done, hash)
+        storeDefer.addCallbacks(self.store_done, self.store_error,
+                                callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
         return storeDefer
 
     def store_done(self, result, hash):
         return storeDefer
 
     def store_done(self, result, hash):
@@ -372,7 +432,8 @@ class AptP2P:
             value = {'t': ''.join(pieces)}
 
             storeDefer = self.dht.storeValue(key, value)
             value = {'t': ''.join(pieces)}
 
             storeDefer = self.dht.storeValue(key, value)
-            storeDefer.addCallback(self.store_torrent_done, key)
+            storeDefer.addCallbacks(self.store_torrent_done, self.store_error,
+                                    callbackArgs = (key, ), errbackArgs = (key, ))
             return storeDefer
         return result
 
             return storeDefer
         return result
 
@@ -380,4 +441,9 @@ class AptP2P:
         """Adding the file to the DHT is complete, and so is the workflow."""
         log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
         return result
         """Adding the file to the DHT is complete, and so is the workflow."""
         log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
         return result
+
+    def store_error(self, err, key):
+        """Adding to the DHT failed."""
+        log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err))
+        return err
     
\ No newline at end of file
     
\ No newline at end of file