2 """The main program code.
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
8 @var download_dir: the name of the directory to use for downloaded files
9 @var peer_dir: the name of the directory to use for peer downloads
12 from binascii import b2a_hex
13 from urlparse import urlunparse
14 from urllib import unquote
17 from twisted.internet import defer, reactor, protocol
18 from twisted.web2 import server, http, http_headers, static
19 from twisted.python import log, failure
20 from twisted.python.filepath import FilePath
22 from interfaces import IDHT, IDHTStats
23 from apt_p2p_conf import config
24 from PeerManager import PeerManager
25 from HTTPServer import TopLevel
26 from MirrorManager import MirrorManager
27 from CacheManager import CacheManager
28 from Hash import HashObject
30 from stats import StatsLogger
31 from util import findMyIPAddr, compact
36 download_dir = 'cache'
39 class AptP2P(protocol.Factory):
40 """The main code object that does all of the work.
42 Contains all of the sub-components that do all the low-level work, and
43 coordinates communication between them.
45 @type dhtClass: L{interfaces.IDHT}
46 @ivar dhtClass: the DHT class to use
47 @type cache_dir: L{twisted.python.filepath.FilePath}
48 @ivar cache_dir: the directory to use for storing all files
50 @ivar db: the database to use for tracking files and hashes
51 @type dht: L{interfaces.IDHT}
52 @ivar dht: the DHT instance
53 @type stats: L{stats.StatsLogger}
54 @ivar stats: the statistics logger to record sent data to
55 @type http_server: L{HTTPServer.TopLevel}
56 @ivar http_server: the web server that will handle all requests from apt
58 @type peers: L{PeerManager.PeerManager}
59 @ivar peers: the manager of all downloads from mirrors and other peers
60 @type mirrors: L{MirrorManager.MirrorManager}
61 @ivar mirrors: the manager of downloaded information about mirrors which
62 can be queried to get hashes from file names
63 @type cache: L{CacheManager.CacheManager}
64 @ivar cache: the manager of all downloaded files
65 @type my_contact: C{string}
66 @ivar my_contact: the 6-byte compact peer representation of this peer's
67 download information (IP address and port)
70 def __init__(self, dhtClass):
71 """Initialize all the sub-components.
73 @type dhtClass: L{interfaces.IDHT}
74 @param dhtClass: the DHT class to use
76 log.msg('Initializing the main apt_p2p application')
77 self.dhtClass = dhtClass
80 def startFactory(self):
81 reactor.callLater(0, self._startFactory)
83 def _startFactory(self):
84 log.msg('Starting the main apt_p2p application')
85 self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
86 if not self.cache_dir.child(download_dir).exists():
87 self.cache_dir.child(download_dir).makedirs()
88 if not self.cache_dir.child(peer_dir).exists():
89 self.cache_dir.child(peer_dir).makedirs()
90 self.db = DB(self.cache_dir.child('apt-p2p.db'))
91 self.dht = self.dhtClass()
92 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
93 self.dht.join().addCallbacks(self.joinComplete, self.joinError)
94 self.stats = StatsLogger(self.db)
95 self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
96 self.http_server.getHTTPFactory().startFactory()
97 self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats)
98 self.mirrors = MirrorManager(self.cache_dir)
99 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
100 self.my_contact = None
102 def stopFactory(self):
103 log.msg('Stoppping the main apt_p2p application')
104 self.http_server.getHTTPFactory().stopFactory()
105 self.mirrors.cleanup()
109 def buildProtocol(self, addr):
110 return self.http_server.getHTTPFactory().buildProtocol(addr)
113 def joinComplete(self, result):
114 """Complete the DHT join process and determine our download information.
116 Called by the DHT when the join has been completed with information
117 on the external IP address and port of this peer.
119 my_addr = findMyIPAddr(result,
120 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
121 config.getboolean('DEFAULT', 'LOCAL_OK'))
123 raise RuntimeError, "IP address for this machine could not be found"
124 self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
125 self.cache.scanDirectories()
126 self.nextRefresh = reactor.callLater(60, self.refreshFiles)
128 def joinError(self, failure):
129 """Joining the DHT has failed."""
130 log.msg("joining DHT failed miserably")
132 raise RuntimeError, "IP address for this machine could not be found"
134 def refreshFiles(self, result = None, hashes = {}):
135 """Refresh any files in the DHT that are about to expire."""
136 if result is not None:
137 log.msg('Storage resulted in: %r' % result)
140 expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
141 hashes = self.db.expiredHashes(expireAfter)
142 if len(hashes.keys()) > 0:
143 log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
148 raw_hash = hashes.keys()[0]
149 self.db.refreshHash(raw_hash)
150 hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
152 storeDefer = self.store(hash)
153 storeDefer.addBoth(self.refreshFiles, hashes)
155 if self.nextRefresh.active():
156 self.nextRefresh.reset(delay)
158 self.nextRefresh = reactor.callLater(delay, self.plRefresh, None, hashes)
161 """Retrieve and format the statistics for the program.
164 @return: the formatted HTML page containing the statistics
166 out = '<html><body>\n\n'
167 out += self.stats.formatHTML(self.my_contact)
169 if IDHTStats.implementedBy(self.dhtClass):
170 out += self.dht.getStats()
171 out += '\n</body></html>\n'
175 def get_resp(self, req, url, orig_resp = None):
176 """Lookup a hash for the file in the local mirror info.
178 Starts the process of getting a response to an apt request.
180 @type req: L{twisted.web2.http.Request}
181 @param req: the initial request sent to the HTTP server by apt
182 @param url: the URI of the actual mirror request
183 @type orig_resp: L{twisted.web2.http.Response}
184 @param orig_resp: the response from the cache to be sent to apt
185 (optional, ignored if missing)
186 @rtype: L{twisted.internet.defer.Deferred}
187 @return: a deferred that will be called back with the response
191 log.msg('Trying to find hash for %s' % url)
192 findDefer = self.mirrors.findHash(unquote(url))
194 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
195 callbackArgs=(req, url, orig_resp, d),
196 errbackArgs=(req, url, orig_resp, d))
199 def findHash_error(self, failure, req, url, orig_resp, d):
200 """Process the error in hash lookup by returning an empty L{HashObject}."""
201 log.msg('Hash lookup for %s resulted in an error: %s' %
202 (url, failure.getErrorMessage()))
203 self.findHash_done(HashObject(), req, url, orig_resp, d)
205 def findHash_done(self, hash, req, url, orig_resp, d):
206 """Use the returned hash to lookup the file in the cache.
208 If the hash was not found, the workflow skips down to download from
209 the mirror (L{startDownload}), or checks the freshness of an old
210 response if there is one.
212 @type hash: L{Hash.HashObject}
213 @param hash: the hash object containing the expected hash for the file
215 if hash.expected() is None:
216 log.msg('Hash for %s was not found' % url)
217 # Send the old response or get a new one
219 self.check_freshness(req, url, orig_resp, d)
221 self.startDownload([], req, hash, url, d)
223 log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
225 # Lookup hash in cache
226 locations = self.db.lookupHash(hash.expected(), filesOnly = True)
227 self.getCachedFile(hash, req, url, d, locations)
229 def check_freshness(self, req, url, orig_resp, d):
230 """Send a HEAD to the mirror to check if the response from the cache is still valid.
232 @type req: L{twisted.web2.http.Request}
233 @param req: the initial request sent to the HTTP server by apt
234 @param url: the URI of the actual mirror request
235 @type orig_resp: L{twisted.web2.http.Response}
236 @param orig_resp: the response from the cache to be sent to apt
237 @rtype: L{twisted.internet.defer.Deferred}
238 @return: a deferred that will be called back with the correct response
240 log.msg('Checking if %s is still fresh' % url)
241 modtime = orig_resp.headers.getHeader('Last-Modified')
242 headDefer = self.peers.get(HashObject(), url, method = "HEAD",
244 headDefer.addCallbacks(self.check_freshness_done,
245 self.check_freshness_error,
246 callbackArgs = (req, url, orig_resp, d),
247 errbackArgs = (req, url, d))
249 def check_freshness_done(self, resp, req, url, orig_resp, d):
250 """Return the fresh response, if stale start to redownload.
252 @type resp: L{twisted.web2.http.Response}
253 @param resp: the response from the mirror to the HEAD request
254 @type req: L{twisted.web2.http.Request}
255 @param req: the initial request sent to the HTTP server by apt
256 @param url: the URI of the actual mirror request
257 @type orig_resp: L{twisted.web2.http.Response}
258 @param orig_resp: the response from the cache to be sent to apt
261 log.msg('Still fresh, returning: %s' % url)
262 d.callback(orig_resp)
264 log.msg('Stale, need to redownload: %s' % url)
265 self.startDownload([], req, HashObject(), url, d)
267 def check_freshness_error(self, err, req, url, d):
268 """Mirror request failed, continue with download.
270 @param err: the response from the mirror to the HEAD request
271 @type req: L{twisted.web2.http.Request}
272 @param req: the initial request sent to the HTTP server by apt
273 @param url: the URI of the actual mirror request
276 self.startDownload([], req, HashObject(), url, d)
278 def getCachedFile(self, hash, req, url, d, locations):
279 """Try to return the file from the cache, otherwise move on to a DHT lookup.
281 @type locations: C{list} of C{dictionary}
282 @param locations: the files in the cache that match the hash,
283 the dictionary contains a key 'path' whose value is a
284 L{twisted.python.filepath.FilePath} object for the file.
287 log.msg('Failed to return file from cache: %s' % url)
288 self.lookupHash(req, hash, url, d)
291 # Get the first possible location from the list
292 file = locations.pop(0)['path']
293 log.msg('Returning cached file: %s' % file.path)
296 resp = static.File(file.path).renderHTTP(req)
297 if isinstance(resp, defer.Deferred):
298 resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
300 self._getCachedFile(resp, hash, req, url, d, locations)
302 def _getCachedFile(self, resp, hash, req, url, d, locations):
303 """Check the returned response to be sure it is valid."""
304 if isinstance(resp, failure.Failure):
305 log.msg('Got error trying to get cached file')
307 # Try the next possible location
308 self.getCachedFile(hash, req, url, d, locations)
311 log.msg('Cached response: %r' % resp)
313 if resp.code >= 200 and resp.code < 400:
316 # Try the next possible location
317 self.getCachedFile(hash, req, url, d, locations)
319 def lookupHash(self, req, hash, url, d):
320 """Lookup the hash in the DHT."""
321 log.msg('Looking up hash in DHT for file: %s' % url)
322 key = hash.expected()
323 lookupDefer = self.dht.getValue(key)
324 lookupDefer.addBoth(self.startDownload, req, hash, url, d)
326 def startDownload(self, values, req, hash, url, d):
327 """Start the download of the file.
329 The download will be from peers if the DHT lookup succeeded, or
330 from the mirror otherwise.
332 @type values: C{list} of C{dictionary}
333 @param values: the returned values from the DHT containing peer
336 # Remove some headers Apt sets in the request
337 req.headers.removeHeader('If-Modified-Since')
338 req.headers.removeHeader('Range')
339 req.headers.removeHeader('If-Range')
341 if not isinstance(values, list) or not values:
342 if not isinstance(values, list):
343 log.msg('DHT lookup for %s failed with error %r' % (url, values))
345 log.msg('Peers for %s were not found' % url)
346 getDefer = self.peers.get(hash, url)
347 getDefer.addCallback(self.cache.save_file, hash, url)
348 getDefer.addErrback(self.cache.save_error, url)
349 getDefer.addCallbacks(d.callback, d.errback)
351 log.msg('Found peers for %s: %r' % (url, values))
352 # Download from the found peers
353 getDefer = self.peers.get(hash, url, values)
354 getDefer.addCallback(self.check_response, hash, url)
355 getDefer.addCallback(self.cache.save_file, hash, url)
356 getDefer.addErrback(self.cache.save_error, url)
357 getDefer.addCallbacks(d.callback, d.errback)
359 def check_response(self, response, hash, url):
360 """Check the response from peers, and download from the mirror if it is not."""
361 if response.code < 200 or response.code >= 300:
362 log.msg('Download from peers failed, going to direct download: %s' % url)
363 getDefer = self.peers.get(hash, url)
367 def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
368 """Add a newly cached file to the mirror info and/or the DHT.
370 If the file was downloaded, set url to the path it was downloaded for.
371 Doesn't add a file to the DHT unless a hash was found for it
372 (but does add it anyway if forceDHT is True).
374 @type file_path: L{twisted.python.filepath.FilePath}
375 @param file_path: the location of the file in the local cache
376 @type hash: L{Hash.HashObject}
377 @param hash: the original (expected) hash object containing also the
378 hash of the downloaded file
379 @type new_hash: C{boolean}
380 @param new_hash: whether the has was new to this peer, and so should
383 @param url: the URI of the location of the file in the mirror
384 (optional, defaults to not adding the file to the mirror info)
385 @type forceDHT: C{boolean}
386 @param forceDHT: whether to force addition of the file to the DHT
387 even if the hash was not found in a mirror
388 (optional, defaults to False)
391 self.mirrors.updatedFile(url, file_path)
393 if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
394 return self.store(hash)
397 def store(self, hash):
398 """Add a key/value pair for the file to the DHT.
400 Sets the key and value from the hash information, and tries to add
404 value = {'c': self.my_contact}
405 pieces = hash.pieceDigests()
407 # Determine how to store any piece data
410 elif len(pieces) <= DHT_PIECES:
411 # Short enough to be stored with our peer contact info
412 value['t'] = {'t': ''.join(pieces)}
413 elif len(pieces) <= TORRENT_PIECES:
414 # Short enough to be stored in a separate key in the DHT
415 value['h'] = sha.new(''.join(pieces)).digest()
417 # Too long, must be served up by our peer HTTP server
418 value['l'] = sha.new(''.join(pieces)).digest()
420 storeDefer = self.dht.storeValue(key, value)
421 storeDefer.addCallbacks(self.store_done, self.store_error,
422 callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
425 def store_done(self, result, hash):
426 """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
427 log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
428 pieces = hash.pieceDigests()
429 if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
430 # Add the piece data key and value to the DHT
431 key = sha.new(''.join(pieces)).digest()
432 value = {'t': ''.join(pieces)}
434 storeDefer = self.dht.storeValue(key, value)
435 storeDefer.addCallbacks(self.store_torrent_done, self.store_error,
436 callbackArgs = (key, ), errbackArgs = (key, ))
440 def store_torrent_done(self, result, key):
441 """Adding the file to the DHT is complete, and so is the workflow."""
442 log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
445 def store_error(self, err, key):
446 """Adding to the DHT failed."""
447 log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err))