X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p%2Fapt_p2p.py;h=e3a8af248257c0e922dee672cae3fffbdcb1b045;hb=882106d0e49e52164a898c1cc9f36d229b4257ac;hp=671e2269aea9d5713c020a4dc3a514aa20ddd894;hpb=d63ad7d7b1c9e5567bd28450197ef810dc5c5475;p=quix0rs-apt-p2p.git diff --git a/apt_p2p/apt_p2p.py b/apt_p2p/apt_p2p.py index 671e226..e3a8af2 100644 --- a/apt_p2p/apt_p2p.py +++ b/apt_p2p/apt_p2p.py @@ -6,7 +6,7 @@ @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry in the DHT @var download_dir: the name of the directory to use for downloaded files - +@var peer_dir: the name of the directory to use for peer downloads """ from binascii import b2a_hex @@ -14,7 +14,7 @@ from urlparse import urlunparse from urllib import unquote import os, re, sha -from twisted.internet import defer, reactor +from twisted.internet import defer, reactor, protocol from twisted.web2 import server, http, http_headers, static from twisted.python import log, failure from twisted.python.filepath import FilePath @@ -27,14 +27,16 @@ from MirrorManager import MirrorManager from CacheManager import CacheManager from Hash import HashObject from db import DB +from stats import StatsLogger from util import findMyIPAddr, compact DHT_PIECES = 4 TORRENT_PIECES = 70 download_dir = 'cache' +peer_dir = 'peers' -class AptP2P: +class AptP2P(protocol.Factory): """The main code object that does all of the work. Contains all of the sub-components that do all the low-level work, and @@ -48,6 +50,8 @@ class AptP2P: @ivar db: the database to use for tracking files and hashes @type dht: L{interfaces.IDHT} @ivar dht: the DHT instance + @type stats: L{stats.StatsLogger} + @ivar stats: the statistics logger to record sent data to @type http_server: L{HTTPServer.TopLevel} @ivar http_server: the web server that will handle all requests from apt and from other peers @@ -71,21 +75,40 @@ class AptP2P: """ log.msg('Initializing the main apt_p2p application') self.dhtClass = dhtClass + + #{ Factory interface + def startFactory(self): + reactor.callLater(0, self._startFactory) + + def _startFactory(self): + log.msg('Starting the main apt_p2p application') self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR')) if not self.cache_dir.child(download_dir).exists(): self.cache_dir.child(download_dir).makedirs() + if not self.cache_dir.child(peer_dir).exists(): + self.cache_dir.child(peer_dir).makedirs() self.db = DB(self.cache_dir.child('apt-p2p.db')) - self.dht = dhtClass() + self.dht = self.dhtClass() self.dht.loadConfig(config, config.get('DEFAULT', 'DHT')) self.dht.join().addCallbacks(self.joinComplete, self.joinError) + self.stats = StatsLogger(self.db) self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self) - self.getHTTPFactory = self.http_server.getHTTPFactory - self.peers = PeerManager(self.cache_dir, self.dht) + self.http_server.getHTTPFactory().startFactory() + self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats) self.mirrors = MirrorManager(self.cache_dir) self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self) self.my_contact = None - - #{ DHT maintenance + + def stopFactory(self): + log.msg('Stoppping the main apt_p2p application') + self.http_server.getHTTPFactory().stopFactory() + self.stats.save() + self.db.close() + + def buildProtocol(self, addr): + return self.http_server.getHTTPFactory().buildProtocol(addr) + + #{ DHT Maintenance def joinComplete(self, result): """Complete the DHT join process and determine our download information. @@ -136,6 +159,8 @@ class AptP2P: @return: the formatted HTML page containing the statistics """ out = '\n\n' + out += self.stats.formatHTML(self.my_contact) + out += '\n\n' if IDHTStats.implementedBy(self.dhtClass): out += self.dht.getStats() out += '\n\n' @@ -157,7 +182,8 @@ class AptP2P: """ log.msg('Checking if %s is still fresh' % url) d = self.peers.get('', url, method = "HEAD", modtime = modtime) - d.addCallback(self.check_freshness_done, req, url, resp) + d.addCallbacks(self.check_freshness_done, self.check_freshness_error, + callbackArgs = (req, url, resp), errbackArgs = (req, url)) return d def check_freshness_done(self, resp, req, url, orig_resp): @@ -178,6 +204,17 @@ class AptP2P: log.msg('Stale, need to redownload: %s' % url) return self.get_resp(req, url) + def check_freshness_error(self, err, req, url): + """Mirror request failed, continue with download. + + @param err: the response from the mirror to the HEAD request + @type req: L{twisted.web2.http.Request} + @param req: the initial request sent to the HTTP server by apt + @param url: the URI of the actual mirror request + """ + log.err(err) + return self.get_resp(req, url) + def get_resp(self, req, url): """Lookup a hash for the file in the local mirror info. @@ -215,7 +252,7 @@ class AptP2P: """ if hash.expected() is None: log.msg('Hash for %s was not found' % url) - self.lookupHash_done([], hash, url, d) + self.lookupHash_done([], req, hash, url, d) else: log.msg('Found hash %s for %s' % (hash.hexexpected(), url)) @@ -233,7 +270,7 @@ class AptP2P: """ if not locations: log.msg('Failed to return file from cache: %s' % url) - self.lookupHash(hash, url, d) + self.lookupHash(req, hash, url, d) return # Get the first possible location from the list @@ -264,14 +301,14 @@ class AptP2P: # Try the next possible location self.getCachedFile(hash, req, url, d, locations) - def lookupHash(self, hash, url, d): + def lookupHash(self, req, hash, url, d): """Lookup the hash in the DHT.""" log.msg('Looking up hash in DHT for file: %s' % url) key = hash.expected() lookupDefer = self.dht.getValue(key) - lookupDefer.addBoth(self.lookupHash_done, hash, url, d) + lookupDefer.addBoth(self.lookupHash_done, req, hash, url, d) - def lookupHash_done(self, values, hash, url, d): + def lookupHash_done(self, values, req, hash, url, d): """Start the download of the file. The download will be from peers if the DHT lookup succeeded, or @@ -281,6 +318,11 @@ class AptP2P: @param values: the returned values from the DHT containing peer download information """ + # Remove some headers Apt sets in the request + req.headers.removeHeader('If-Modified-Since') + req.headers.removeHeader('Range') + req.headers.removeHeader('If-Range') + if not isinstance(values, list) or not values: if not isinstance(values, list): log.msg('DHT lookup for %s failed with error %r' % (url, values))