X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_p2p%2Fapt_p2p.py;h=385e27571226a2c8f32571bac3b39b46b205c95b;hp=889ebd96243bddd83d77cafad10cec1504138abf;hb=872f3e24afa2a8a23956d1c191520a7f699ac8c6;hpb=d563aab35fc0fd1fab59e0f6d594fbb05735cf21 diff --git a/apt_p2p/apt_p2p.py b/apt_p2p/apt_p2p.py index 889ebd9..385e275 100644 --- a/apt_p2p/apt_p2p.py +++ b/apt_p2p/apt_p2p.py @@ -1,26 +1,19 @@ """The main program code. -@var DHT_PIECES: the maximum number of pieces to store with our contact info - in the DHT -@var TORRENT_PIECES: the maximum number of pieces to store as a separate entry - in the DHT @var download_dir: the name of the directory to use for downloaded files - +@var peer_dir: the name of the directory to use for peer downloads """ -from binascii import b2a_hex -from urlparse import urlunparse from urllib import unquote -import os, re, sha -from twisted.internet import defer, reactor -from twisted.web2 import server, http, http_headers, static +from twisted.internet import defer, reactor, protocol +from twisted.web2 import static from twisted.python import log, failure from twisted.python.filepath import FilePath -from interfaces import IDHT, IDHTStats from apt_p2p_conf import config +from DHTManager import DHT from PeerManager import PeerManager from HTTPServer import TopLevel from MirrorManager import MirrorManager @@ -28,14 +21,11 @@ from CacheManager import CacheManager from Hash import HashObject from db import DB from stats import StatsLogger -from util import findMyIPAddr, compact - -DHT_PIECES = 4 -TORRENT_PIECES = 70 download_dir = 'cache' +peer_dir = 'peers' -class AptP2P: +class AptP2P(protocol.Factory): """The main code object that does all of the work. Contains all of the sub-components that do all the low-level work, and @@ -47,8 +37,8 @@ class AptP2P: @ivar cache_dir: the directory to use for storing all files @type db: L{db.DB} @ivar db: the database to use for tracking files and hashes - @type dht: L{interfaces.IDHT} - @ivar dht: the DHT instance + @type dht: L{DHTManager.DHT} + @ivar dht: the manager for DHT requests @type stats: L{stats.StatsLogger} @ivar stats: the statistics logger to record sent data to @type http_server: L{HTTPServer.TopLevel} @@ -61,9 +51,8 @@ class AptP2P: can be queried to get hashes from file names @type cache: L{CacheManager.CacheManager} @ivar cache: the manager of all downloaded files - @type my_contact: C{string} - @ivar my_contact: the 6-byte compact peer representation of this peer's - download information (IP address and port) + @type my_addr: C{string}, C{int} + @ivar my_addr: the IP address and port of this peer """ def __init__(self, dhtClass): @@ -74,65 +63,46 @@ class AptP2P: """ log.msg('Initializing the main apt_p2p application') self.dhtClass = dhtClass + self.my_addr = None + + #{ Factory interface + def startFactory(self): + reactor.callLater(0, self._startFactory) + + def _startFactory(self): + log.msg('Starting the main apt_p2p application') self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR')) if not self.cache_dir.child(download_dir).exists(): self.cache_dir.child(download_dir).makedirs() + if not self.cache_dir.child(peer_dir).exists(): + self.cache_dir.child(peer_dir).makedirs() self.db = DB(self.cache_dir.child('apt-p2p.db')) - self.dht = dhtClass() - self.dht.loadConfig(config, config.get('DEFAULT', 'DHT')) - self.dht.join().addCallbacks(self.joinComplete, self.joinError) + self.dht = DHT(self.dhtClass, self.db) + df = self.dht.start() + df.addCallback(self._dhtStarted) self.stats = StatsLogger(self.db) self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self) - self.getHTTPFactory = self.http_server.getHTTPFactory - self.peers = PeerManager(self.cache_dir, self.dht, self.stats) + self.http_server.getHTTPFactory().startFactory() + self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats) self.mirrors = MirrorManager(self.cache_dir) self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self) - self.my_contact = None - - #{ DHT maintenance - def joinComplete(self, result): - """Complete the DHT join process and determine our download information. - - Called by the DHT when the join has been completed with information - on the external IP address and port of this peer. - """ - my_addr = findMyIPAddr(result, - config.getint(config.get('DEFAULT', 'DHT'), 'PORT'), - config.getboolean('DEFAULT', 'LOCAL_OK')) - if not my_addr: - raise RuntimeError, "IP address for this machine could not be found" - self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT')) - self.cache.scanDirectories() - reactor.callLater(60, self.refreshFiles) - - def joinError(self, failure): - """Joining the DHT has failed.""" - log.msg("joining DHT failed miserably") - log.err(failure) - raise RuntimeError, "IP address for this machine could not be found" - def refreshFiles(self): - """Refresh any files in the DHT that are about to expire.""" - expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH') - hashes = self.db.expiredHashes(expireAfter) - if len(hashes.keys()) > 0: - log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys())) - self._refreshFiles(None, hashes) + def _dhtStarted(self, result): + """Save the returned address and start scanning the cache.""" + self.my_addr = result + self.cache.scanDirectories() - def _refreshFiles(self, result, hashes): - if result is not None: - log.msg('Storage resulted in: %r' % result) - - if hashes: - raw_hash = hashes.keys()[0] - self.db.refreshHash(raw_hash) - hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces']) - del hashes[raw_hash] - storeDefer = self.store(hash) - storeDefer.addBoth(self._refreshFiles, hashes) - else: - reactor.callLater(60, self.refreshFiles) + def stopFactory(self): + log.msg('Stoppping the main apt_p2p application') + self.http_server.getHTTPFactory().stopFactory() + self.mirrors.cleanup() + self.stats.save() + self.db.close() + def buildProtocol(self, addr): + return self.http_server.getHTTPFactory().buildProtocol(addr) + + #{ Other functions def getStats(self): """Retrieve and format the statistics for the program. @@ -140,70 +110,24 @@ class AptP2P: @return: the formatted HTML page containing the statistics """ out = '\n\n' - out += self.stats.formatHTML(self.my_contact) + out += self.stats.formatHTML(self.my_addr) out += '\n\n' - if IDHTStats.implementedBy(self.dhtClass): - out += self.dht.getStats() + out += self.dht.getStats() out += '\n\n' return out #{ Main workflow - def check_freshness(self, req, url, modtime, resp): - """Send a HEAD to the mirror to check if the response from the cache is still valid. + def get_resp(self, req, url, orig_resp = None): + """Lookup a hash for the file in the local mirror info. - @type req: L{twisted.web2.http.Request} - @param req: the initial request sent to the HTTP server by apt - @param url: the URI of the actual mirror request - @type modtime: C{int} - @param modtime: the modified time of the cached file (seconds since epoch) - @type resp: L{twisted.web2.http.Response} - @param resp: the response from the cache to be sent to apt - @rtype: L{twisted.internet.defer.Deferred} - @return: a deferred that will be called back with the correct response - """ - log.msg('Checking if %s is still fresh' % url) - d = self.peers.get('', url, method = "HEAD", modtime = modtime) - d.addCallbacks(self.check_freshness_done, self.check_freshness_error, - callbackArgs = (req, url, resp), errbackArgs = (req, url)) - return d - - def check_freshness_done(self, resp, req, url, orig_resp): - """Process the returned response from the mirror. + Starts the process of getting a response to an apt request. - @type resp: L{twisted.web2.http.Response} - @param resp: the response from the mirror to the HEAD request @type req: L{twisted.web2.http.Request} @param req: the initial request sent to the HTTP server by apt @param url: the URI of the actual mirror request @type orig_resp: L{twisted.web2.http.Response} @param orig_resp: the response from the cache to be sent to apt - """ - if resp.code == 304: - log.msg('Still fresh, returning: %s' % url) - return orig_resp - else: - log.msg('Stale, need to redownload: %s' % url) - return self.get_resp(req, url) - - def check_freshness_error(self, err, req, url): - """Mirror request failed, continue with download. - - @param err: the response from the mirror to the HEAD request - @type req: L{twisted.web2.http.Request} - @param req: the initial request sent to the HTTP server by apt - @param url: the URI of the actual mirror request - """ - log.err(err) - return self.get_resp(req, url) - - def get_resp(self, req, url): - """Lookup a hash for the file in the local mirror info. - - Starts the process of getting a response to an uncached apt request. - - @type req: L{twisted.web2.http.Request} - @param req: the initial request sent to the HTTP server by apt - @param url: the URI of the actual mirror request + (optional, ignored if missing) @rtype: L{twisted.internet.defer.Deferred} @return: a deferred that will be called back with the response """ @@ -213,27 +137,33 @@ class AptP2P: findDefer = self.mirrors.findHash(unquote(url)) findDefer.addCallbacks(self.findHash_done, self.findHash_error, - callbackArgs=(req, url, d), errbackArgs=(req, url, d)) - findDefer.addErrback(log.err) + callbackArgs=(req, url, orig_resp, d), + errbackArgs=(req, url, orig_resp, d)) return d - def findHash_error(self, failure, req, url, d): + def findHash_error(self, failure, req, url, orig_resp, d): """Process the error in hash lookup by returning an empty L{HashObject}.""" - log.err(failure) - self.findHash_done(HashObject(), req, url, d) + log.msg('Hash lookup for %s resulted in an error: %s' % + (url, failure.getErrorMessage())) + self.findHash_done(HashObject(), req, url, orig_resp, d) - def findHash_done(self, hash, req, url, d): - """Use the returned hash to lookup the file in the cache. + def findHash_done(self, hash, req, url, orig_resp, d): + """Use the returned hash to lookup the file in the cache. If the hash was not found, the workflow skips down to download from - the mirror (L{lookupHash_done}). + the mirror (L{startDownload}), or checks the freshness of an old + response if there is one. @type hash: L{Hash.HashObject} @param hash: the hash object containing the expected hash for the file """ if hash.expected() is None: log.msg('Hash for %s was not found' % url) - self.lookupHash_done([], hash, url, d) + # Send the old response or get a new one + if orig_resp: + self.check_freshness(req, url, orig_resp, d) + else: + self.startDownload([], req, hash, url, d) else: log.msg('Found hash %s for %s' % (hash.hexexpected(), url)) @@ -241,6 +171,53 @@ class AptP2P: locations = self.db.lookupHash(hash.expected(), filesOnly = True) self.getCachedFile(hash, req, url, d, locations) + def check_freshness(self, req, url, orig_resp, d): + """Send a HEAD to the mirror to check if the response from the cache is still valid. + + @type req: L{twisted.web2.http.Request} + @param req: the initial request sent to the HTTP server by apt + @param url: the URI of the actual mirror request + @type orig_resp: L{twisted.web2.http.Response} + @param orig_resp: the response from the cache to be sent to apt + """ + log.msg('Checking if %s is still fresh' % url) + modtime = orig_resp.headers.getHeader('Last-Modified') + headDefer = self.peers.get(HashObject(), url, method = "HEAD", + modtime = modtime) + headDefer.addCallbacks(self.check_freshness_done, + self.check_freshness_error, + callbackArgs = (req, url, orig_resp, d), + errbackArgs = (req, url, d)) + + def check_freshness_done(self, resp, req, url, orig_resp, d): + """Return the fresh response, if stale start to redownload. + + @type resp: L{twisted.web2.http.Response} + @param resp: the response from the mirror to the HEAD request + @type req: L{twisted.web2.http.Request} + @param req: the initial request sent to the HTTP server by apt + @param url: the URI of the actual mirror request + @type orig_resp: L{twisted.web2.http.Response} + @param orig_resp: the response from the cache to be sent to apt + """ + if resp.code == 304: + log.msg('Still fresh, returning: %s' % url) + d.callback(orig_resp) + else: + log.msg('Stale, need to redownload: %s' % url) + self.startDownload([], req, HashObject(), url, d) + + def check_freshness_error(self, err, req, url, d): + """Mirror request failed, continue with download. + + @param err: the response from the mirror to the HEAD request + @type req: L{twisted.web2.http.Request} + @param req: the initial request sent to the HTTP server by apt + @param url: the URI of the actual mirror request + """ + log.err(err) + self.startDownload([], req, HashObject(), url, d) + def getCachedFile(self, hash, req, url, d, locations): """Try to return the file from the cache, otherwise move on to a DHT lookup. @@ -251,7 +228,7 @@ class AptP2P: """ if not locations: log.msg('Failed to return file from cache: %s' % url) - self.lookupHash(hash, url, d) + self.lookupHash(req, hash, url, d) return # Get the first possible location from the list @@ -269,7 +246,7 @@ class AptP2P: """Check the returned response to be sure it is valid.""" if isinstance(resp, failure.Failure): log.msg('Got error trying to get cached file') - log.err() + log.err(resp) # Try the next possible location self.getCachedFile(hash, req, url, d, locations) return @@ -282,14 +259,14 @@ class AptP2P: # Try the next possible location self.getCachedFile(hash, req, url, d, locations) - def lookupHash(self, hash, url, d): + def lookupHash(self, req, hash, url, d): """Lookup the hash in the DHT.""" log.msg('Looking up hash in DHT for file: %s' % url) key = hash.expected() - lookupDefer = self.dht.getValue(key) - lookupDefer.addBoth(self.lookupHash_done, hash, url, d) + lookupDefer = self.dht.get(key) + lookupDefer.addBoth(self.startDownload, req, hash, url, d) - def lookupHash_done(self, values, hash, url, d): + def startDownload(self, values, req, hash, url, d): """Start the download of the file. The download will be from peers if the DHT lookup succeeded, or @@ -299,12 +276,18 @@ class AptP2P: @param values: the returned values from the DHT containing peer download information """ + # Remove some headers Apt sets in the request + req.headers.removeHeader('If-Modified-Since') + req.headers.removeHeader('Range') + req.headers.removeHeader('If-Range') + if not isinstance(values, list) or not values: if not isinstance(values, list): log.msg('DHT lookup for %s failed with error %r' % (url, values)) else: log.msg('Peers for %s were not found' % url) getDefer = self.peers.get(hash, url) +# getDefer.addErrback(self.final_fallback, hash, url) getDefer.addCallback(self.cache.save_file, hash, url) getDefer.addErrback(self.cache.save_error, url) getDefer.addCallbacks(d.callback, d.errback) @@ -322,9 +305,17 @@ class AptP2P: if response.code < 200 or response.code >= 300: log.msg('Download from peers failed, going to direct download: %s' % url) getDefer = self.peers.get(hash, url) +# getDefer.addErrback(self.final_fallback, hash, url) return getDefer return response + def final_fallback(self, err, hash, url): + """Final retry if the mirror still generated an error.""" + log.msg('Download from mirror failed, retrying once only: %s' % url) + log.err(err) + getDefer = self.peers.get(hash, url) + return getDefer + def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False): """Add a newly cached file to the mirror info and/or the DHT. @@ -351,60 +342,7 @@ class AptP2P: if url: self.mirrors.updatedFile(url, file_path) - if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT): - return self.store(hash) + if self.my_addr and hash and new_hash and (hash.expected() is not None or forceDHT): + return self.dht.store(hash) return None - - def store(self, hash): - """Add a key/value pair for the file to the DHT. - - Sets the key and value from the hash information, and tries to add - it to the DHT. - """ - key = hash.digest() - value = {'c': self.my_contact} - pieces = hash.pieceDigests() - - # Determine how to store any piece data - if len(pieces) <= 1: - pass - elif len(pieces) <= DHT_PIECES: - # Short enough to be stored with our peer contact info - value['t'] = {'t': ''.join(pieces)} - elif len(pieces) <= TORRENT_PIECES: - # Short enough to be stored in a separate key in the DHT - value['h'] = sha.new(''.join(pieces)).digest() - else: - # Too long, must be served up by our peer HTTP server - value['l'] = sha.new(''.join(pieces)).digest() - - storeDefer = self.dht.storeValue(key, value) - storeDefer.addCallbacks(self.store_done, self.store_error, - callbackArgs = (hash, ), errbackArgs = (hash.digest(), )) - return storeDefer - - def store_done(self, result, hash): - """Add a key/value pair for the pieces of the file to the DHT (if necessary).""" - log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result)) - pieces = hash.pieceDigests() - if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES: - # Add the piece data key and value to the DHT - key = sha.new(''.join(pieces)).digest() - value = {'t': ''.join(pieces)} - - storeDefer = self.dht.storeValue(key, value) - storeDefer.addCallbacks(self.store_torrent_done, self.store_error, - callbackArgs = (key, ), errbackArgs = (key, )) - return storeDefer - return result - - def store_torrent_done(self, result, key): - """Adding the file to the DHT is complete, and so is the workflow.""" - log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result)) - return result - - def store_error(self, err, key): - """Adding to the DHT failed.""" - log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err)) - return err \ No newline at end of file