@var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
in the DHT
@var download_dir: the name of the directory to use for downloaded files
-
+@var peer_dir: the name of the directory to use for peer downloads
"""
from binascii import b2a_hex
from urllib import unquote
import os, re, sha
-from twisted.internet import defer, reactor
+from twisted.internet import defer, reactor, protocol
from twisted.web2 import server, http, http_headers, static
from twisted.python import log, failure
from twisted.python.filepath import FilePath
from CacheManager import CacheManager
from Hash import HashObject
from db import DB
+from stats import StatsLogger
from util import findMyIPAddr, compact
DHT_PIECES = 4
TORRENT_PIECES = 70
download_dir = 'cache'
+peer_dir = 'peers'
-class AptP2P:
+class AptP2P(protocol.Factory):
"""The main code object that does all of the work.
Contains all of the sub-components that do all the low-level work, and
@ivar db: the database to use for tracking files and hashes
@type dht: L{interfaces.IDHT}
@ivar dht: the DHT instance
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics logger to record sent data to
@type http_server: L{HTTPServer.TopLevel}
@ivar http_server: the web server that will handle all requests from apt
and from other peers
"""
log.msg('Initializing the main apt_p2p application')
self.dhtClass = dhtClass
+
+ #{ Factory interface
+ def startFactory(self):
+ reactor.callLater(0, self._startFactory)
+
+ def _startFactory(self):
+ log.msg('Starting the main apt_p2p application')
self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
if not self.cache_dir.child(download_dir).exists():
self.cache_dir.child(download_dir).makedirs()
+ if not self.cache_dir.child(peer_dir).exists():
+ self.cache_dir.child(peer_dir).makedirs()
self.db = DB(self.cache_dir.child('apt-p2p.db'))
- self.dht = dhtClass()
+ self.dht = self.dhtClass()
self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
self.dht.join().addCallbacks(self.joinComplete, self.joinError)
+ self.stats = StatsLogger(self.db)
self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
- self.getHTTPFactory = self.http_server.getHTTPFactory
- self.peers = PeerManager(self.cache_dir, self.dht)
+ self.http_server.getHTTPFactory().startFactory()
+ self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats)
self.mirrors = MirrorManager(self.cache_dir)
self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
self.my_contact = None
-
- #{ DHT maintenance
+
+ def stopFactory(self):
+ log.msg('Stoppping the main apt_p2p application')
+ self.http_server.getHTTPFactory().stopFactory()
+ self.stats.save()
+ self.db.close()
+
+ def buildProtocol(self, addr):
+ return self.http_server.getHTTPFactory().buildProtocol(addr)
+
+ #{ DHT Maintenance
def joinComplete(self, result):
"""Complete the DHT join process and determine our download information.
@return: the formatted HTML page containing the statistics
"""
out = '<html><body>\n\n'
+ out += self.stats.formatHTML(self.my_contact)
+ out += '\n\n'
if IDHTStats.implementedBy(self.dhtClass):
out += self.dht.getStats()
out += '\n</body></html>\n'
"""
log.msg('Checking if %s is still fresh' % url)
d = self.peers.get('', url, method = "HEAD", modtime = modtime)
- d.addCallback(self.check_freshness_done, req, url, resp)
+ d.addCallbacks(self.check_freshness_done, self.check_freshness_error,
+ callbackArgs = (req, url, resp), errbackArgs = (req, url))
return d
def check_freshness_done(self, resp, req, url, orig_resp):
log.msg('Stale, need to redownload: %s' % url)
return self.get_resp(req, url)
+ def check_freshness_error(self, err, req, url):
+ """Mirror request failed, continue with download.
+
+ @param err: the response from the mirror to the HEAD request
+ @type req: L{twisted.web2.http.Request}
+ @param req: the initial request sent to the HTTP server by apt
+ @param url: the URI of the actual mirror request
+ """
+ log.err(err)
+ return self.get_resp(req, url)
+
def get_resp(self, req, url):
"""Lookup a hash for the file in the local mirror info.
"""
if hash.expected() is None:
log.msg('Hash for %s was not found' % url)
- self.lookupHash_done([], hash, url, d)
+ self.lookupHash_done([], req, hash, url, d)
else:
log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
"""
if not locations:
log.msg('Failed to return file from cache: %s' % url)
- self.lookupHash(hash, url, d)
+ self.lookupHash(req, hash, url, d)
return
# Get the first possible location from the list
# Try the next possible location
self.getCachedFile(hash, req, url, d, locations)
- def lookupHash(self, hash, url, d):
+ def lookupHash(self, req, hash, url, d):
"""Lookup the hash in the DHT."""
log.msg('Looking up hash in DHT for file: %s' % url)
key = hash.expected()
lookupDefer = self.dht.getValue(key)
- lookupDefer.addBoth(self.lookupHash_done, hash, url, d)
+ lookupDefer.addBoth(self.lookupHash_done, req, hash, url, d)
- def lookupHash_done(self, values, hash, url, d):
+ def lookupHash_done(self, values, req, hash, url, d):
"""Start the download of the file.
The download will be from peers if the DHT lookup succeeded, or
@param values: the returned values from the DHT containing peer
download information
"""
+ # Remove some headers Apt sets in the request
+ req.headers.removeHeader('If-Modified-Since')
+ req.headers.removeHeader('Range')
+ req.headers.removeHeader('If-Range')
+
if not isinstance(values, list) or not values:
if not isinstance(values, list):
log.msg('DHT lookup for %s failed with error %r' % (url, values))