To run apt-p2p, you probably want to do something like::
from apt_p2p.apt_p2p import AptP2P
- myapp = AptP2P(myDHT)
+ factory = AptP2P(DHT)
-where myDHT is a DHT that implements interfaces.IDHT.
+where DHT is a class that implements interfaces.IDHT.
Diagram of the interaction between the given modules::
+---------------+ +-----------------------------------+ +-------------
- | AptP2P | | DHT | | Internet
+ | AptP2P | | DHT | |
| |--->|join DHT|----|--\
| |--->|loadConfig | | | Another
| |--->|getValue | | | Node
| |--->|storeValue DHT|<---|--/
| |--->|leave | |
- | | +-----------------------------------+ |
- | | +-------------+ +----------------+ |
- | | | PeerManager | | HTTPDownloader*| |
- | |--->|get |--->|get HTTP|----|---> Mirror
- | | | |--->|getRange | |
- | |--->|close |--->|close HTTP|----|--\
- | | +-------------+ +----------------+ | | Another
- | | +-----------------------------------+ | | Peer
- | | | HTTPServer HTTP|<---|--/
- | |--->|getHTTPFactory | +-------------
- |check_freshness|<---| | +-------------
+ | /-----|--->|getStats | |
+ | | | +-----------------------------------+ | Internet
+ | | | +-------------+ +----------------+ |
+ | | | | PeerManager | | HTTPDownloader*| |
+ | | |--->|get |--->|get HTTP|----|---> Mirror
+ | | | | |--->|getRange | |
+ | | |--->|close |--->|close HTTP|----|--\
+ | | | +-------------+ +----------------+ | | Another
+ | | | +-----------------------------------+ | | Peer
+ | | | | HTTPServer HTTP|<---|--/
+ | | | | | +-------------
+ | getStats|<---| | +-------------
| get_resp|<---| HTTP|<---|HTTP Request
| | +-----------------------------------+ |
| | +---------------+ +--------------+ | Local Net
return out
#{ Main workflow
- def check_freshness(self, req, url, modtime, resp):
- """Send a HEAD to the mirror to check if the response from the cache is still valid.
+ def get_resp(self, req, url, orig_resp = None):
+ """Lookup a hash for the file in the local mirror info.
- @type req: L{twisted.web2.http.Request}
- @param req: the initial request sent to the HTTP server by apt
- @param url: the URI of the actual mirror request
- @type modtime: C{int}
- @param modtime: the modified time of the cached file (seconds since epoch)
- @type resp: L{twisted.web2.http.Response}
- @param resp: the response from the cache to be sent to apt
- @rtype: L{twisted.internet.defer.Deferred}
- @return: a deferred that will be called back with the correct response
- """
- log.msg('Checking if %s is still fresh' % url)
- d = self.peers.get('', url, method = "HEAD", modtime = modtime)
- d.addCallbacks(self.check_freshness_done, self.check_freshness_error,
- callbackArgs = (req, url, resp), errbackArgs = (req, url))
- return d
-
- def check_freshness_done(self, resp, req, url, orig_resp):
- """Process the returned response from the mirror.
+ Starts the process of getting a response to an apt request.
- @type resp: L{twisted.web2.http.Response}
- @param resp: the response from the mirror to the HEAD request
@type req: L{twisted.web2.http.Request}
@param req: the initial request sent to the HTTP server by apt
@param url: the URI of the actual mirror request
@type orig_resp: L{twisted.web2.http.Response}
@param orig_resp: the response from the cache to be sent to apt
- """
- if resp.code == 304:
- log.msg('Still fresh, returning: %s' % url)
- return orig_resp
- else:
- log.msg('Stale, need to redownload: %s' % url)
- return self.get_resp(req, url)
-
- def check_freshness_error(self, err, req, url):
- """Mirror request failed, continue with download.
-
- @param err: the response from the mirror to the HEAD request
- @type req: L{twisted.web2.http.Request}
- @param req: the initial request sent to the HTTP server by apt
- @param url: the URI of the actual mirror request
- """
- log.err(err)
- return self.get_resp(req, url)
-
- def get_resp(self, req, url):
- """Lookup a hash for the file in the local mirror info.
-
- Starts the process of getting a response to an uncached apt request.
-
- @type req: L{twisted.web2.http.Request}
- @param req: the initial request sent to the HTTP server by apt
- @param url: the URI of the actual mirror request
+ (optional, ignored if missing)
@rtype: L{twisted.internet.defer.Deferred}
@return: a deferred that will be called back with the response
"""
findDefer = self.mirrors.findHash(unquote(url))
findDefer.addCallbacks(self.findHash_done, self.findHash_error,
- callbackArgs=(req, url, d), errbackArgs=(req, url, d))
+ callbackArgs=(req, url, orig_resp, d),
+ errbackArgs=(req, url, orig_resp, d))
findDefer.addErrback(log.err)
return d
- def findHash_error(self, failure, req, url, d):
+ def findHash_error(self, failure, req, url, orig_resp, d):
"""Process the error in hash lookup by returning an empty L{HashObject}."""
log.err(failure)
- self.findHash_done(HashObject(), req, url, d)
+ self.findHash_done(HashObject(), req, url, orig_resp, d)
- def findHash_done(self, hash, req, url, d):
- """Use the returned hash to lookup the file in the cache.
+ def findHash_done(self, hash, req, url, orig_resp, d):
+ """Use the returned hash to lookup the file in the cache.
If the hash was not found, the workflow skips down to download from
- the mirror (L{lookupHash_done}).
+ the mirror (L{startDownload}), or checks the freshness of an old
+ response if there is one.
@type hash: L{Hash.HashObject}
@param hash: the hash object containing the expected hash for the file
"""
if hash.expected() is None:
log.msg('Hash for %s was not found' % url)
- self.lookupHash_done([], req, hash, url, d)
+ # Send the old response or get a new one
+ if orig_resp:
+ self.check_freshness(req, url, orig_resp, d)
+ else:
+ self.startDownload([], req, hash, url, d)
else:
log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
locations = self.db.lookupHash(hash.expected(), filesOnly = True)
self.getCachedFile(hash, req, url, d, locations)
+ def check_freshness(self, req, url, orig_resp, d):
+ """Send a HEAD to the mirror to check if the response from the cache is still valid.
+
+ @type req: L{twisted.web2.http.Request}
+ @param req: the initial request sent to the HTTP server by apt
+ @param url: the URI of the actual mirror request
+ @type orig_resp: L{twisted.web2.http.Response}
+ @param orig_resp: the response from the cache to be sent to apt
+ @rtype: L{twisted.internet.defer.Deferred}
+ @return: a deferred that will be called back with the correct response
+ """
+ log.msg('Checking if %s is still fresh' % url)
+ modtime = orig_resp.headers.getHeader('Last-Modified')
+ headDefer = self.peers.get(HashObject(), url, method = "HEAD",
+ modtime = modtime)
+ headDefer.addCallbacks(self.check_freshness_done,
+ self.check_freshness_error,
+ callbackArgs = (req, url, orig_resp, d),
+ errbackArgs = (req, url, d))
+
+ def check_freshness_done(self, resp, req, url, orig_resp, d):
+ """Return the fresh response, if stale start to redownload.
+
+ @type resp: L{twisted.web2.http.Response}
+ @param resp: the response from the mirror to the HEAD request
+ @type req: L{twisted.web2.http.Request}
+ @param req: the initial request sent to the HTTP server by apt
+ @param url: the URI of the actual mirror request
+ @type orig_resp: L{twisted.web2.http.Response}
+ @param orig_resp: the response from the cache to be sent to apt
+ """
+ if resp.code == 304:
+ log.msg('Still fresh, returning: %s' % url)
+ d.callback(orig_resp)
+ else:
+ log.msg('Stale, need to redownload: %s' % url)
+ self.startDownload([], req, HashObject(), url, d)
+
+ def check_freshness_error(self, err, req, url, d):
+ """Mirror request failed, continue with download.
+
+ @param err: the response from the mirror to the HEAD request
+ @type req: L{twisted.web2.http.Request}
+ @param req: the initial request sent to the HTTP server by apt
+ @param url: the URI of the actual mirror request
+ """
+ log.err(err)
+ self.startDownload([], req, HashObject(), url, d)
+
def getCachedFile(self, hash, req, url, d, locations):
"""Try to return the file from the cache, otherwise move on to a DHT lookup.
log.msg('Looking up hash in DHT for file: %s' % url)
key = hash.expected()
lookupDefer = self.dht.getValue(key)
- lookupDefer.addBoth(self.lookupHash_done, req, hash, url, d)
+ lookupDefer.addBoth(self.startDownload, req, hash, url, d)
- def lookupHash_done(self, values, req, hash, url, d):
+ def startDownload(self, values, req, hash, url, d):
"""Start the download of the file.
The download will be from peers if the DHT lookup succeeded, or