@var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
in the DHT
@var download_dir: the name of the directory to use for downloaded files
-
+@var peer_dir: the name of the directory to use for peer downloads
"""
from binascii import b2a_hex
from urllib import unquote
import os, re, sha
-from twisted.internet import defer, reactor
+from twisted.internet import defer, reactor, protocol
from twisted.web2 import server, http, http_headers, static
from twisted.python import log, failure
from twisted.python.filepath import FilePath
from CacheManager import CacheManager
from Hash import HashObject
from db import DB
+from stats import StatsLogger
from util import findMyIPAddr, compact
DHT_PIECES = 4
TORRENT_PIECES = 70
download_dir = 'cache'
+peer_dir = 'peers'
-class AptP2P:
+class AptP2P(protocol.Factory):
"""The main code object that does all of the work.
Contains all of the sub-components that do all the low-level work, and
@ivar db: the database to use for tracking files and hashes
@type dht: L{interfaces.IDHT}
@ivar dht: the DHT instance
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics logger to record sent data to
@type http_server: L{HTTPServer.TopLevel}
@ivar http_server: the web server that will handle all requests from apt
and from other peers
"""
log.msg('Initializing the main apt_p2p application')
self.dhtClass = dhtClass
+
+ #{ Factory interface
+ def startFactory(self):
+ reactor.callLater(0, self._startFactory)
+
+ def _startFactory(self):
+ log.msg('Starting the main apt_p2p application')
self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
if not self.cache_dir.child(download_dir).exists():
self.cache_dir.child(download_dir).makedirs()
+ if not self.cache_dir.child(peer_dir).exists():
+ self.cache_dir.child(peer_dir).makedirs()
self.db = DB(self.cache_dir.child('apt-p2p.db'))
- self.dht = dhtClass()
+ self.dht = self.dhtClass()
self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
self.dht.join().addCallbacks(self.joinComplete, self.joinError)
+ self.stats = StatsLogger(self.db)
self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
- self.getHTTPFactory = self.http_server.getHTTPFactory
- self.peers = PeerManager(self.cache_dir, self.dht)
+ self.http_server.getHTTPFactory().startFactory()
+ self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats)
self.mirrors = MirrorManager(self.cache_dir)
self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
self.my_contact = None
-
- #{ DHT maintenance
+
+ def stopFactory(self):
+ log.msg('Stoppping the main apt_p2p application')
+ self.http_server.getHTTPFactory().stopFactory()
+ self.stats.save()
+ self.db.close()
+
+ def buildProtocol(self, addr):
+ return self.http_server.getHTTPFactory().buildProtocol(addr)
+
+ #{ DHT Maintenance
def joinComplete(self, result):
"""Complete the DHT join process and determine our download information.
@return: the formatted HTML page containing the statistics
"""
out = '<html><body>\n\n'
+ out += self.stats.formatHTML(self.my_contact)
+ out += '\n\n'
if IDHTStats.implementedBy(self.dhtClass):
out += self.dht.getStats()
out += '\n</body></html>\n'
return out
#{ Main workflow
- def check_freshness(self, req, url, modtime, resp):
- """Send a HEAD to the mirror to check if the response from the cache is still valid.
+ def get_resp(self, req, url, orig_resp = None):
+ """Lookup a hash for the file in the local mirror info.
- @type req: L{twisted.web2.http.Request}
- @param req: the initial request sent to the HTTP server by apt
- @param url: the URI of the actual mirror request
- @type modtime: C{int}
- @param modtime: the modified time of the cached file (seconds since epoch)
- @type resp: L{twisted.web2.http.Response}
- @param resp: the response from the cache to be sent to apt
- @rtype: L{twisted.internet.defer.Deferred}
- @return: a deferred that will be called back with the correct response
- """
- log.msg('Checking if %s is still fresh' % url)
- d = self.peers.get('', url, method = "HEAD", modtime = modtime)
- d.addCallback(self.check_freshness_done, req, url, resp)
- return d
-
- def check_freshness_done(self, resp, req, url, orig_resp):
- """Process the returned response from the mirror.
+ Starts the process of getting a response to an apt request.
- @type resp: L{twisted.web2.http.Response}
- @param resp: the response from the mirror to the HEAD request
@type req: L{twisted.web2.http.Request}
@param req: the initial request sent to the HTTP server by apt
@param url: the URI of the actual mirror request
@type orig_resp: L{twisted.web2.http.Response}
@param orig_resp: the response from the cache to be sent to apt
- """
- if resp.code == 304:
- log.msg('Still fresh, returning: %s' % url)
- return orig_resp
- else:
- log.msg('Stale, need to redownload: %s' % url)
- return self.get_resp(req, url)
-
- def get_resp(self, req, url):
- """Lookup a hash for the file in the local mirror info.
-
- Starts the process of getting a response to an uncached apt request.
-
- @type req: L{twisted.web2.http.Request}
- @param req: the initial request sent to the HTTP server by apt
- @param url: the URI of the actual mirror request
+ (optional, ignored if missing)
@rtype: L{twisted.internet.defer.Deferred}
@return: a deferred that will be called back with the response
"""
findDefer = self.mirrors.findHash(unquote(url))
findDefer.addCallbacks(self.findHash_done, self.findHash_error,
- callbackArgs=(req, url, d), errbackArgs=(req, url, d))
+ callbackArgs=(req, url, orig_resp, d),
+ errbackArgs=(req, url, orig_resp, d))
findDefer.addErrback(log.err)
return d
- def findHash_error(self, failure, req, url, d):
+ def findHash_error(self, failure, req, url, orig_resp, d):
"""Process the error in hash lookup by returning an empty L{HashObject}."""
log.err(failure)
- self.findHash_done(HashObject(), req, url, d)
+ self.findHash_done(HashObject(), req, url, orig_resp, d)
- def findHash_done(self, hash, req, url, d):
- """Use the returned hash to lookup the file in the cache.
+ def findHash_done(self, hash, req, url, orig_resp, d):
+ """Use the returned hash to lookup the file in the cache.
If the hash was not found, the workflow skips down to download from
- the mirror (L{lookupHash_done}).
+ the mirror (L{startDownload}), or checks the freshness of an old
+ response if there is one.
@type hash: L{Hash.HashObject}
@param hash: the hash object containing the expected hash for the file
"""
if hash.expected() is None:
log.msg('Hash for %s was not found' % url)
- self.lookupHash_done([], hash, url, d)
+ # Send the old response or get a new one
+ if orig_resp:
+ self.check_freshness(req, url, orig_resp, d)
+ else:
+ self.startDownload([], req, hash, url, d)
else:
log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
locations = self.db.lookupHash(hash.expected(), filesOnly = True)
self.getCachedFile(hash, req, url, d, locations)
+ def check_freshness(self, req, url, orig_resp, d):
+ """Send a HEAD to the mirror to check if the response from the cache is still valid.
+
+ @type req: L{twisted.web2.http.Request}
+ @param req: the initial request sent to the HTTP server by apt
+ @param url: the URI of the actual mirror request
+ @type orig_resp: L{twisted.web2.http.Response}
+ @param orig_resp: the response from the cache to be sent to apt
+ @rtype: L{twisted.internet.defer.Deferred}
+ @return: a deferred that will be called back with the correct response
+ """
+ log.msg('Checking if %s is still fresh' % url)
+ modtime = orig_resp.headers.getHeader('Last-Modified')
+ headDefer = self.peers.get(HashObject(), url, method = "HEAD",
+ modtime = modtime)
+ headDefer.addCallbacks(self.check_freshness_done,
+ self.check_freshness_error,
+ callbackArgs = (req, url, orig_resp, d),
+ errbackArgs = (req, url, d))
+
+ def check_freshness_done(self, resp, req, url, orig_resp, d):
+ """Return the fresh response, if stale start to redownload.
+
+ @type resp: L{twisted.web2.http.Response}
+ @param resp: the response from the mirror to the HEAD request
+ @type req: L{twisted.web2.http.Request}
+ @param req: the initial request sent to the HTTP server by apt
+ @param url: the URI of the actual mirror request
+ @type orig_resp: L{twisted.web2.http.Response}
+ @param orig_resp: the response from the cache to be sent to apt
+ """
+ if resp.code == 304:
+ log.msg('Still fresh, returning: %s' % url)
+ d.callback(orig_resp)
+ else:
+ log.msg('Stale, need to redownload: %s' % url)
+ self.startDownload([], req, HashObject(), url, d)
+
+ def check_freshness_error(self, err, req, url, d):
+ """Mirror request failed, continue with download.
+
+ @param err: the response from the mirror to the HEAD request
+ @type req: L{twisted.web2.http.Request}
+ @param req: the initial request sent to the HTTP server by apt
+ @param url: the URI of the actual mirror request
+ """
+ log.err(err)
+ self.startDownload([], req, HashObject(), url, d)
+
def getCachedFile(self, hash, req, url, d, locations):
"""Try to return the file from the cache, otherwise move on to a DHT lookup.
"""
if not locations:
log.msg('Failed to return file from cache: %s' % url)
- self.lookupHash(hash, url, d)
+ self.lookupHash(req, hash, url, d)
return
# Get the first possible location from the list
# Try the next possible location
self.getCachedFile(hash, req, url, d, locations)
- def lookupHash(self, hash, url, d):
+ def lookupHash(self, req, hash, url, d):
"""Lookup the hash in the DHT."""
log.msg('Looking up hash in DHT for file: %s' % url)
key = hash.expected()
lookupDefer = self.dht.getValue(key)
- lookupDefer.addBoth(self.lookupHash_done, hash, url, d)
+ lookupDefer.addBoth(self.startDownload, req, hash, url, d)
- def lookupHash_done(self, values, hash, url, d):
+ def startDownload(self, values, req, hash, url, d):
"""Start the download of the file.
The download will be from peers if the DHT lookup succeeded, or
@param values: the returned values from the DHT containing peer
download information
"""
+ # Remove some headers Apt sets in the request
+ req.headers.removeHeader('If-Modified-Since')
+ req.headers.removeHeader('Range')
+ req.headers.removeHeader('If-Range')
+
if not isinstance(values, list) or not values:
if not isinstance(values, list):
log.msg('DHT lookup for %s failed with error %r' % (url, values))