X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2Fapt_p2p.py;h=8bfe928cb0b8f442457c15a853a7d4560e21d209;hb=e6042f7b9084b3a317e3b61be4931c5eeed9fa41;hp=0063e9fffd2ebb08373be003c346c4d90c950d63;hpb=9a8119cf7bb5dbdea853a694c84aee7e638aa287;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/apt_p2p.py b/apt_p2p/apt_p2p.py index 0063e9f..8bfe928 100644 --- a/apt_p2p/apt_p2p.py +++ b/apt_p2p/apt_p2p.py @@ -6,14 +6,15 @@ @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry in the DHT @var download_dir: the name of the directory to use for downloaded files - +@var peer_dir: the name of the directory to use for peer downloads """ from binascii import b2a_hex from urlparse import urlunparse +from urllib import unquote import os, re, sha -from twisted.internet import defer, reactor +from twisted.internet import defer, reactor, protocol from twisted.web2 import server, http, http_headers, static from twisted.python import log, failure from twisted.python.filepath import FilePath @@ -26,14 +27,16 @@ from MirrorManager import MirrorManager from CacheManager import CacheManager from Hash import HashObject from db import DB +from stats import StatsLogger from util import findMyIPAddr, compact DHT_PIECES = 4 TORRENT_PIECES = 70 download_dir = 'cache' +peer_dir = 'peers' -class AptP2P: +class AptP2P(protocol.Factory): """The main code object that does all of the work. Contains all of the sub-components that do all the low-level work, and @@ -47,6 +50,8 @@ class AptP2P: @ivar db: the database to use for tracking files and hashes @type dht: L{interfaces.IDHT} @ivar dht: the DHT instance + @type stats: L{stats.StatsLogger} + @ivar stats: the statistics logger to record sent data to @type http_server: L{HTTPServer.TopLevel} @ivar http_server: the web server that will handle all requests from apt and from other peers @@ -65,28 +70,46 @@ class AptP2P: def __init__(self, dhtClass): """Initialize all the sub-components. - @type dht: L{interfaces.IDHT} - @param dht: the DHT class to use + @type dhtClass: L{interfaces.IDHT} + @param dhtClass: the DHT class to use """ log.msg('Initializing the main apt_p2p application') self.dhtClass = dhtClass + + #{ Factory interface + def startFactory(self): + reactor.callLater(0, self._startFactory) + + def _startFactory(self): + log.msg('Starting the main apt_p2p application') self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR')) if not self.cache_dir.child(download_dir).exists(): self.cache_dir.child(download_dir).makedirs() + if not self.cache_dir.child(peer_dir).exists(): + self.cache_dir.child(peer_dir).makedirs() self.db = DB(self.cache_dir.child('apt-p2p.db')) - self.dht = dhtClass() + self.dht = self.dhtClass() self.dht.loadConfig(config, config.get('DEFAULT', 'DHT')) self.dht.join().addCallbacks(self.joinComplete, self.joinError) - self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self, - config.getint('DEFAULT', 'UPLOAD_LIMIT')) - self.getHTTPFactory = self.http_server.getHTTPFactory - self.peers = PeerManager(self.cache_dir, self.dht) - self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE')) - other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')] - self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self) + self.stats = StatsLogger(self.db) + self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self) + self.http_server.getHTTPFactory().startFactory() + self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats) + self.mirrors = MirrorManager(self.cache_dir) + self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self) self.my_contact = None - - #{ DHT maintenance + + def stopFactory(self): + log.msg('Stoppping the main apt_p2p application') + self.http_server.getHTTPFactory().stopFactory() + self.mirrors.cleanup() + self.stats.save() + self.db.close() + + def buildProtocol(self, addr): + return self.http_server.getHTTPFactory().buildProtocol(addr) + + #{ DHT Maintenance def joinComplete(self, result): """Complete the DHT join process and determine our download information. @@ -100,7 +123,7 @@ class AptP2P: raise RuntimeError, "IP address for this machine could not be found" self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT')) self.cache.scanDirectories() - reactor.callLater(60, self.refreshFiles) + self.nextRefresh = reactor.callLater(60, self.refreshFiles) def joinError(self, failure): """Joining the DHT has failed.""" @@ -108,27 +131,31 @@ class AptP2P: log.err(failure) raise RuntimeError, "IP address for this machine could not be found" - def refreshFiles(self): + def refreshFiles(self, result = None, hashes = {}): """Refresh any files in the DHT that are about to expire.""" - expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH') - hashes = self.db.expiredHashes(expireAfter) - if len(hashes.keys()) > 0: - log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys())) - self._refreshFiles(None, hashes) - - def _refreshFiles(self, result, hashes): if result is not None: log.msg('Storage resulted in: %r' % result) + if not hashes: + expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH') + hashes = self.db.expiredHashes(expireAfter) + if len(hashes.keys()) > 0: + log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys())) + + delay = 60 if hashes: + delay = 3 raw_hash = hashes.keys()[0] self.db.refreshHash(raw_hash) hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces']) del hashes[raw_hash] storeDefer = self.store(hash) - storeDefer.addBoth(self._refreshFiles, hashes) + storeDefer.addBoth(self.refreshFiles, hashes) + + if self.nextRefresh.active(): + self.nextRefresh.reset(delay) else: - reactor.callLater(60, self.refreshFiles) + self.nextRefresh = reactor.callLater(delay, self.plRefresh, None, hashes) def getStats(self): """Retrieve and format the statistics for the program. @@ -137,86 +164,61 @@ class AptP2P: @return: the formatted HTML page containing the statistics """ out = '\n\n' + out += self.stats.formatHTML(self.my_contact) + out += '\n\n' if IDHTStats.implementedBy(self.dhtClass): out += self.dht.getStats() out += '\n\n' return out #{ Main workflow - def check_freshness(self, req, url, modtime, resp): - """Send a HEAD to the mirror to check if the response from the cache is still valid. + def get_resp(self, req, url, orig_resp = None): + """Lookup a hash for the file in the local mirror info. - @type req: L{twisted.web2.http.Request} - @param req: the initial request sent to the HTTP server by apt - @param url: the URI of the actual mirror request - @type modtime: C{int} - @param modtime: the modified time of the cached file (seconds since epoch) - @type resp: L{twisted.web2.http.Response} - @param resp: the response from the cache to be sent to apt - @rtype: L{twisted.internet.defer.Deferred} - @return: a deferred that will be called back with the correct response - """ - log.msg('Checking if %s is still fresh' % url) - d = self.peers.get('', url, method = "HEAD", modtime = modtime) - d.addCallback(self.check_freshness_done, req, url, resp) - return d - - def check_freshness_done(self, resp, req, url, orig_resp): - """Process the returned response from the mirror. + Starts the process of getting a response to an apt request. - @type resp: L{twisted.web2.http.Response} - @param resp: the response from the mirror to the HEAD request @type req: L{twisted.web2.http.Request} @param req: the initial request sent to the HTTP server by apt @param url: the URI of the actual mirror request @type orig_resp: L{twisted.web2.http.Response} @param orig_resp: the response from the cache to be sent to apt - """ - if resp.code == 304: - log.msg('Still fresh, returning: %s' % url) - return orig_resp - else: - log.msg('Stale, need to redownload: %s' % url) - return self.get_resp(req, url) - - def get_resp(self, req, url): - """Lookup a hash for the file in the local mirror info. - - Starts the process of getting a response to an uncached apt request. - - @type req: L{twisted.web2.http.Request} - @param req: the initial request sent to the HTTP server by apt - @param url: the URI of the actual mirror request + (optional, ignored if missing) @rtype: L{twisted.internet.defer.Deferred} @return: a deferred that will be called back with the response """ d = defer.Deferred() log.msg('Trying to find hash for %s' % url) - findDefer = self.mirrors.findHash(url) + findDefer = self.mirrors.findHash(unquote(url)) findDefer.addCallbacks(self.findHash_done, self.findHash_error, - callbackArgs=(req, url, d), errbackArgs=(req, url, d)) - findDefer.addErrback(log.err) + callbackArgs=(req, url, orig_resp, d), + errbackArgs=(req, url, orig_resp, d)) return d - def findHash_error(self, failure, req, url, d): + def findHash_error(self, failure, req, url, orig_resp, d): """Process the error in hash lookup by returning an empty L{HashObject}.""" - log.err(failure) - self.findHash_done(HashObject(), req, url, d) + log.msg('Hash lookup for %s resulted in an error: %s' % + (url, failure.getErrorMessage())) + self.findHash_done(HashObject(), req, url, orig_resp, d) - def findHash_done(self, hash, req, url, d): - """Use the returned hash to lookup the file in the cache. + def findHash_done(self, hash, req, url, orig_resp, d): + """Use the returned hash to lookup the file in the cache. If the hash was not found, the workflow skips down to download from - the mirror (L{lookupHash_done}). + the mirror (L{startDownload}), or checks the freshness of an old + response if there is one. @type hash: L{Hash.HashObject} @param hash: the hash object containing the expected hash for the file """ if hash.expected() is None: log.msg('Hash for %s was not found' % url) - self.lookupHash_done([], hash, url, d) + # Send the old response or get a new one + if orig_resp: + self.check_freshness(req, url, orig_resp, d) + else: + self.startDownload([], req, hash, url, d) else: log.msg('Found hash %s for %s' % (hash.hexexpected(), url)) @@ -224,6 +226,55 @@ class AptP2P: locations = self.db.lookupHash(hash.expected(), filesOnly = True) self.getCachedFile(hash, req, url, d, locations) + def check_freshness(self, req, url, orig_resp, d): + """Send a HEAD to the mirror to check if the response from the cache is still valid. + + @type req: L{twisted.web2.http.Request} + @param req: the initial request sent to the HTTP server by apt + @param url: the URI of the actual mirror request + @type orig_resp: L{twisted.web2.http.Response} + @param orig_resp: the response from the cache to be sent to apt + @rtype: L{twisted.internet.defer.Deferred} + @return: a deferred that will be called back with the correct response + """ + log.msg('Checking if %s is still fresh' % url) + modtime = orig_resp.headers.getHeader('Last-Modified') + headDefer = self.peers.get(HashObject(), url, method = "HEAD", + modtime = modtime) + headDefer.addCallbacks(self.check_freshness_done, + self.check_freshness_error, + callbackArgs = (req, url, orig_resp, d), + errbackArgs = (req, url, d)) + + def check_freshness_done(self, resp, req, url, orig_resp, d): + """Return the fresh response, if stale start to redownload. + + @type resp: L{twisted.web2.http.Response} + @param resp: the response from the mirror to the HEAD request + @type req: L{twisted.web2.http.Request} + @param req: the initial request sent to the HTTP server by apt + @param url: the URI of the actual mirror request + @type orig_resp: L{twisted.web2.http.Response} + @param orig_resp: the response from the cache to be sent to apt + """ + if resp.code == 304: + log.msg('Still fresh, returning: %s' % url) + d.callback(orig_resp) + else: + log.msg('Stale, need to redownload: %s' % url) + self.startDownload([], req, HashObject(), url, d) + + def check_freshness_error(self, err, req, url, d): + """Mirror request failed, continue with download. + + @param err: the response from the mirror to the HEAD request + @type req: L{twisted.web2.http.Request} + @param req: the initial request sent to the HTTP server by apt + @param url: the URI of the actual mirror request + """ + log.err(err) + self.startDownload([], req, HashObject(), url, d) + def getCachedFile(self, hash, req, url, d, locations): """Try to return the file from the cache, otherwise move on to a DHT lookup. @@ -234,7 +285,7 @@ class AptP2P: """ if not locations: log.msg('Failed to return file from cache: %s' % url) - self.lookupHash(hash, url, d) + self.lookupHash(req, hash, url, d) return # Get the first possible location from the list @@ -252,7 +303,7 @@ class AptP2P: """Check the returned response to be sure it is valid.""" if isinstance(resp, failure.Failure): log.msg('Got error trying to get cached file') - log.err() + log.err(resp) # Try the next possible location self.getCachedFile(hash, req, url, d, locations) return @@ -265,14 +316,14 @@ class AptP2P: # Try the next possible location self.getCachedFile(hash, req, url, d, locations) - def lookupHash(self, hash, url, d): + def lookupHash(self, req, hash, url, d): """Lookup the hash in the DHT.""" log.msg('Looking up hash in DHT for file: %s' % url) key = hash.expected() lookupDefer = self.dht.getValue(key) - lookupDefer.addCallback(self.lookupHash_done, hash, url, d) + lookupDefer.addBoth(self.startDownload, req, hash, url, d) - def lookupHash_done(self, values, hash, url, d): + def startDownload(self, values, req, hash, url, d): """Start the download of the file. The download will be from peers if the DHT lookup succeeded, or @@ -282,8 +333,16 @@ class AptP2P: @param values: the returned values from the DHT containing peer download information """ - if not values: - log.msg('Peers for %s were not found' % url) + # Remove some headers Apt sets in the request + req.headers.removeHeader('If-Modified-Since') + req.headers.removeHeader('Range') + req.headers.removeHeader('If-Range') + + if not isinstance(values, list) or not values: + if not isinstance(values, list): + log.msg('DHT lookup for %s failed with error %r' % (url, values)) + else: + log.msg('Peers for %s were not found' % url) getDefer = self.peers.get(hash, url) getDefer.addCallback(self.cache.save_file, hash, url) getDefer.addErrback(self.cache.save_error, url) @@ -359,7 +418,8 @@ class AptP2P: value['l'] = sha.new(''.join(pieces)).digest() storeDefer = self.dht.storeValue(key, value) - storeDefer.addCallback(self.store_done, hash) + storeDefer.addCallbacks(self.store_done, self.store_error, + callbackArgs = (hash, ), errbackArgs = (hash.digest(), )) return storeDefer def store_done(self, result, hash): @@ -372,7 +432,8 @@ class AptP2P: value = {'t': ''.join(pieces)} storeDefer = self.dht.storeValue(key, value) - storeDefer.addCallback(self.store_torrent_done, key) + storeDefer.addCallbacks(self.store_torrent_done, self.store_error, + callbackArgs = (key, ), errbackArgs = (key, )) return storeDefer return result @@ -380,4 +441,9 @@ class AptP2P: """Adding the file to the DHT is complete, and so is the workflow.""" log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result)) return result + + def store_error(self, err, key): + """Adding to the DHT failed.""" + log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err)) + return err \ No newline at end of file