X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_p2p%2Fapt_p2p.py;h=41f32d36c2bff611570248a09c7713916e84773c;hp=f88fd791a184fde37646501ea2fb941eb17e70fb;hb=dbcab5189211e7e072c6477d651d578509cd0e0b;hpb=a9f0deccc4673d5332622ce40407ff009af6c8a3 diff --git a/apt_p2p/apt_p2p.py b/apt_p2p/apt_p2p.py index f88fd79..41f32d3 100644 --- a/apt_p2p/apt_p2p.py +++ b/apt_p2p/apt_p2p.py @@ -6,7 +6,7 @@ @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry in the DHT @var download_dir: the name of the directory to use for downloaded files - +@var peer_dir: the name of the directory to use for peer downloads """ from binascii import b2a_hex @@ -14,7 +14,7 @@ from urlparse import urlunparse from urllib import unquote import os, re, sha -from twisted.internet import defer, reactor +from twisted.internet import defer, reactor, protocol from twisted.web2 import server, http, http_headers, static from twisted.python import log, failure from twisted.python.filepath import FilePath @@ -34,8 +34,9 @@ DHT_PIECES = 4 TORRENT_PIECES = 70 download_dir = 'cache' +peer_dir = 'peers' -class AptP2P: +class AptP2P(protocol.Factory): """The main code object that does all of the work. Contains all of the sub-components that do all the low-level work, and @@ -74,27 +75,40 @@ class AptP2P: """ log.msg('Initializing the main apt_p2p application') self.dhtClass = dhtClass + + #{ Factory interface + def startFactory(self): + reactor.callLater(0, self._startFactory) + + def _startFactory(self): + log.msg('Starting the main apt_p2p application') self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR')) if not self.cache_dir.child(download_dir).exists(): self.cache_dir.child(download_dir).makedirs() + if not self.cache_dir.child(peer_dir).exists(): + self.cache_dir.child(peer_dir).makedirs() self.db = DB(self.cache_dir.child('apt-p2p.db')) - self.dht = dhtClass() + self.dht = self.dhtClass() self.dht.loadConfig(config, config.get('DEFAULT', 'DHT')) self.dht.join().addCallbacks(self.joinComplete, self.joinError) self.stats = StatsLogger(self.db) self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self) - self.getHTTPFactory = self.http_server.getHTTPFactory - self.peers = PeerManager(self.cache_dir, self.dht, self.stats) + self.http_server.getHTTPFactory().startFactory() + self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats) self.mirrors = MirrorManager(self.cache_dir) self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self) self.my_contact = None - reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown) - - #{ Maintenance - def shutdown(self): + + def stopFactory(self): + log.msg('Stoppping the main apt_p2p application') + self.http_server.getHTTPFactory().stopFactory() self.stats.save() self.db.close() + + def buildProtocol(self, addr): + return self.http_server.getHTTPFactory().buildProtocol(addr) + #{ DHT Maintenance def joinComplete(self, result): """Complete the DHT join process and determine our download information. @@ -153,62 +167,17 @@ class AptP2P: return out #{ Main workflow - def check_freshness(self, req, url, modtime, resp): - """Send a HEAD to the mirror to check if the response from the cache is still valid. + def get_resp(self, req, url, orig_resp = None): + """Lookup a hash for the file in the local mirror info. - @type req: L{twisted.web2.http.Request} - @param req: the initial request sent to the HTTP server by apt - @param url: the URI of the actual mirror request - @type modtime: C{int} - @param modtime: the modified time of the cached file (seconds since epoch) - @type resp: L{twisted.web2.http.Response} - @param resp: the response from the cache to be sent to apt - @rtype: L{twisted.internet.defer.Deferred} - @return: a deferred that will be called back with the correct response - """ - log.msg('Checking if %s is still fresh' % url) - d = self.peers.get('', url, method = "HEAD", modtime = modtime) - d.addCallbacks(self.check_freshness_done, self.check_freshness_error, - callbackArgs = (req, url, resp), errbackArgs = (req, url)) - return d - - def check_freshness_done(self, resp, req, url, orig_resp): - """Process the returned response from the mirror. + Starts the process of getting a response to an apt request. - @type resp: L{twisted.web2.http.Response} - @param resp: the response from the mirror to the HEAD request @type req: L{twisted.web2.http.Request} @param req: the initial request sent to the HTTP server by apt @param url: the URI of the actual mirror request @type orig_resp: L{twisted.web2.http.Response} @param orig_resp: the response from the cache to be sent to apt - """ - if resp.code == 304: - log.msg('Still fresh, returning: %s' % url) - return orig_resp - else: - log.msg('Stale, need to redownload: %s' % url) - return self.get_resp(req, url) - - def check_freshness_error(self, err, req, url): - """Mirror request failed, continue with download. - - @param err: the response from the mirror to the HEAD request - @type req: L{twisted.web2.http.Request} - @param req: the initial request sent to the HTTP server by apt - @param url: the URI of the actual mirror request - """ - log.err(err) - return self.get_resp(req, url) - - def get_resp(self, req, url): - """Lookup a hash for the file in the local mirror info. - - Starts the process of getting a response to an uncached apt request. - - @type req: L{twisted.web2.http.Request} - @param req: the initial request sent to the HTTP server by apt - @param url: the URI of the actual mirror request + (optional, ignored if missing) @rtype: L{twisted.internet.defer.Deferred} @return: a deferred that will be called back with the response """ @@ -218,27 +187,33 @@ class AptP2P: findDefer = self.mirrors.findHash(unquote(url)) findDefer.addCallbacks(self.findHash_done, self.findHash_error, - callbackArgs=(req, url, d), errbackArgs=(req, url, d)) + callbackArgs=(req, url, orig_resp, d), + errbackArgs=(req, url, orig_resp, d)) findDefer.addErrback(log.err) return d - def findHash_error(self, failure, req, url, d): + def findHash_error(self, failure, req, url, orig_resp, d): """Process the error in hash lookup by returning an empty L{HashObject}.""" log.err(failure) - self.findHash_done(HashObject(), req, url, d) + self.findHash_done(HashObject(), req, url, orig_resp, d) - def findHash_done(self, hash, req, url, d): - """Use the returned hash to lookup the file in the cache. + def findHash_done(self, hash, req, url, orig_resp, d): + """Use the returned hash to lookup the file in the cache. If the hash was not found, the workflow skips down to download from - the mirror (L{lookupHash_done}). + the mirror (L{startDownload}), or checks the freshness of an old + response if there is one. @type hash: L{Hash.HashObject} @param hash: the hash object containing the expected hash for the file """ if hash.expected() is None: log.msg('Hash for %s was not found' % url) - self.lookupHash_done([], hash, url, d) + # Send the old response or get a new one + if orig_resp: + self.check_freshness(req, url, orig_resp, d) + else: + self.startDownload([], req, hash, url, d) else: log.msg('Found hash %s for %s' % (hash.hexexpected(), url)) @@ -246,6 +221,55 @@ class AptP2P: locations = self.db.lookupHash(hash.expected(), filesOnly = True) self.getCachedFile(hash, req, url, d, locations) + def check_freshness(self, req, url, orig_resp, d): + """Send a HEAD to the mirror to check if the response from the cache is still valid. + + @type req: L{twisted.web2.http.Request} + @param req: the initial request sent to the HTTP server by apt + @param url: the URI of the actual mirror request + @type orig_resp: L{twisted.web2.http.Response} + @param orig_resp: the response from the cache to be sent to apt + @rtype: L{twisted.internet.defer.Deferred} + @return: a deferred that will be called back with the correct response + """ + log.msg('Checking if %s is still fresh' % url) + modtime = orig_resp.headers.getHeader('Last-Modified') + headDefer = self.peers.get(HashObject(), url, method = "HEAD", + modtime = modtime) + headDefer.addCallbacks(self.check_freshness_done, + self.check_freshness_error, + callbackArgs = (req, url, orig_resp, d), + errbackArgs = (req, url, d)) + + def check_freshness_done(self, resp, req, url, orig_resp, d): + """Return the fresh response, if stale start to redownload. + + @type resp: L{twisted.web2.http.Response} + @param resp: the response from the mirror to the HEAD request + @type req: L{twisted.web2.http.Request} + @param req: the initial request sent to the HTTP server by apt + @param url: the URI of the actual mirror request + @type orig_resp: L{twisted.web2.http.Response} + @param orig_resp: the response from the cache to be sent to apt + """ + if resp.code == 304: + log.msg('Still fresh, returning: %s' % url) + d.callback(orig_resp) + else: + log.msg('Stale, need to redownload: %s' % url) + self.startDownload([], req, HashObject(), url, d) + + def check_freshness_error(self, err, req, url, d): + """Mirror request failed, continue with download. + + @param err: the response from the mirror to the HEAD request + @type req: L{twisted.web2.http.Request} + @param req: the initial request sent to the HTTP server by apt + @param url: the URI of the actual mirror request + """ + log.err(err) + self.startDownload([], req, HashObject(), url, d) + def getCachedFile(self, hash, req, url, d, locations): """Try to return the file from the cache, otherwise move on to a DHT lookup. @@ -256,7 +280,7 @@ class AptP2P: """ if not locations: log.msg('Failed to return file from cache: %s' % url) - self.lookupHash(hash, url, d) + self.lookupHash(req, hash, url, d) return # Get the first possible location from the list @@ -287,14 +311,14 @@ class AptP2P: # Try the next possible location self.getCachedFile(hash, req, url, d, locations) - def lookupHash(self, hash, url, d): + def lookupHash(self, req, hash, url, d): """Lookup the hash in the DHT.""" log.msg('Looking up hash in DHT for file: %s' % url) key = hash.expected() lookupDefer = self.dht.getValue(key) - lookupDefer.addBoth(self.lookupHash_done, hash, url, d) + lookupDefer.addBoth(self.startDownload, req, hash, url, d) - def lookupHash_done(self, values, hash, url, d): + def startDownload(self, values, req, hash, url, d): """Start the download of the file. The download will be from peers if the DHT lookup succeeded, or @@ -304,6 +328,11 @@ class AptP2P: @param values: the returned values from the DHT containing peer download information """ + # Remove some headers Apt sets in the request + req.headers.removeHeader('If-Modified-Since') + req.headers.removeHeader('Range') + req.headers.removeHeader('If-Range') + if not isinstance(values, list) or not values: if not isinstance(values, list): log.msg('DHT lookup for %s failed with error %r' % (url, values))