2 """The main program code.
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
8 @var download_dir: the name of the directory to use for downloaded files
12 from binascii import b2a_hex
13 from urlparse import urlunparse
14 from urllib import unquote
17 from twisted.internet import defer, reactor
18 from twisted.web2 import server, http, http_headers, static
19 from twisted.python import log, failure
20 from twisted.python.filepath import FilePath
22 from interfaces import IDHT, IDHTStats
23 from apt_p2p_conf import config
24 from PeerManager import PeerManager
25 from HTTPServer import TopLevel
26 from MirrorManager import MirrorManager
27 from CacheManager import CacheManager
28 from Hash import HashObject
30 from stats import StatsLogger
31 from util import findMyIPAddr, compact
36 download_dir = 'cache'
39 """The main code object that does all of the work.
41 Contains all of the sub-components that do all the low-level work, and
42 coordinates communication between them.
44 @type dhtClass: L{interfaces.IDHT}
45 @ivar dhtClass: the DHT class to use
46 @type cache_dir: L{twisted.python.filepath.FilePath}
47 @ivar cache_dir: the directory to use for storing all files
49 @ivar db: the database to use for tracking files and hashes
50 @type dht: L{interfaces.IDHT}
51 @ivar dht: the DHT instance
52 @type stats: L{stats.StatsLogger}
53 @ivar stats: the statistics logger to record sent data to
54 @type http_server: L{HTTPServer.TopLevel}
55 @ivar http_server: the web server that will handle all requests from apt
57 @type peers: L{PeerManager.PeerManager}
58 @ivar peers: the manager of all downloads from mirrors and other peers
59 @type mirrors: L{MirrorManager.MirrorManager}
60 @ivar mirrors: the manager of downloaded information about mirrors which
61 can be queried to get hashes from file names
62 @type cache: L{CacheManager.CacheManager}
63 @ivar cache: the manager of all downloaded files
64 @type my_contact: C{string}
65 @ivar my_contact: the 6-byte compact peer representation of this peer's
66 download information (IP address and port)
69 def __init__(self, dhtClass):
70 """Initialize all the sub-components.
72 @type dhtClass: L{interfaces.IDHT}
73 @param dhtClass: the DHT class to use
75 log.msg('Initializing the main apt_p2p application')
76 self.dhtClass = dhtClass
77 self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
78 if not self.cache_dir.child(download_dir).exists():
79 self.cache_dir.child(download_dir).makedirs()
80 self.db = DB(self.cache_dir.child('apt-p2p.db'))
82 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
83 self.dht.join().addCallbacks(self.joinComplete, self.joinError)
84 self.stats = StatsLogger(self.db)
85 self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
86 self.getHTTPFactory = self.http_server.getHTTPFactory
87 self.peers = PeerManager(self.cache_dir, self.dht, self.stats)
88 self.mirrors = MirrorManager(self.cache_dir)
89 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
90 self.my_contact = None
91 reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
98 def joinComplete(self, result):
99 """Complete the DHT join process and determine our download information.
101 Called by the DHT when the join has been completed with information
102 on the external IP address and port of this peer.
104 my_addr = findMyIPAddr(result,
105 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
106 config.getboolean('DEFAULT', 'LOCAL_OK'))
108 raise RuntimeError, "IP address for this machine could not be found"
109 self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
110 self.cache.scanDirectories()
111 reactor.callLater(60, self.refreshFiles)
113 def joinError(self, failure):
114 """Joining the DHT has failed."""
115 log.msg("joining DHT failed miserably")
117 raise RuntimeError, "IP address for this machine could not be found"
119 def refreshFiles(self):
120 """Refresh any files in the DHT that are about to expire."""
121 expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
122 hashes = self.db.expiredHashes(expireAfter)
123 if len(hashes.keys()) > 0:
124 log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
125 self._refreshFiles(None, hashes)
127 def _refreshFiles(self, result, hashes):
128 if result is not None:
129 log.msg('Storage resulted in: %r' % result)
132 raw_hash = hashes.keys()[0]
133 self.db.refreshHash(raw_hash)
134 hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
136 storeDefer = self.store(hash)
137 storeDefer.addBoth(self._refreshFiles, hashes)
139 reactor.callLater(60, self.refreshFiles)
142 """Retrieve and format the statistics for the program.
145 @return: the formatted HTML page containing the statistics
147 out = '<html><body>\n\n'
148 out += self.stats.formatHTML(self.my_contact)
150 if IDHTStats.implementedBy(self.dhtClass):
151 out += self.dht.getStats()
152 out += '\n</body></html>\n'
156 def check_freshness(self, req, url, modtime, resp):
157 """Send a HEAD to the mirror to check if the response from the cache is still valid.
159 @type req: L{twisted.web2.http.Request}
160 @param req: the initial request sent to the HTTP server by apt
161 @param url: the URI of the actual mirror request
162 @type modtime: C{int}
163 @param modtime: the modified time of the cached file (seconds since epoch)
164 @type resp: L{twisted.web2.http.Response}
165 @param resp: the response from the cache to be sent to apt
166 @rtype: L{twisted.internet.defer.Deferred}
167 @return: a deferred that will be called back with the correct response
169 log.msg('Checking if %s is still fresh' % url)
170 d = self.peers.get('', url, method = "HEAD", modtime = modtime)
171 d.addCallbacks(self.check_freshness_done, self.check_freshness_error,
172 callbackArgs = (req, url, resp), errbackArgs = (req, url))
175 def check_freshness_done(self, resp, req, url, orig_resp):
176 """Process the returned response from the mirror.
178 @type resp: L{twisted.web2.http.Response}
179 @param resp: the response from the mirror to the HEAD request
180 @type req: L{twisted.web2.http.Request}
181 @param req: the initial request sent to the HTTP server by apt
182 @param url: the URI of the actual mirror request
183 @type orig_resp: L{twisted.web2.http.Response}
184 @param orig_resp: the response from the cache to be sent to apt
187 log.msg('Still fresh, returning: %s' % url)
190 log.msg('Stale, need to redownload: %s' % url)
191 return self.get_resp(req, url)
193 def check_freshness_error(self, err, req, url):
194 """Mirror request failed, continue with download.
196 @param err: the response from the mirror to the HEAD request
197 @type req: L{twisted.web2.http.Request}
198 @param req: the initial request sent to the HTTP server by apt
199 @param url: the URI of the actual mirror request
202 return self.get_resp(req, url)
204 def get_resp(self, req, url):
205 """Lookup a hash for the file in the local mirror info.
207 Starts the process of getting a response to an uncached apt request.
209 @type req: L{twisted.web2.http.Request}
210 @param req: the initial request sent to the HTTP server by apt
211 @param url: the URI of the actual mirror request
212 @rtype: L{twisted.internet.defer.Deferred}
213 @return: a deferred that will be called back with the response
217 log.msg('Trying to find hash for %s' % url)
218 findDefer = self.mirrors.findHash(unquote(url))
220 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
221 callbackArgs=(req, url, d), errbackArgs=(req, url, d))
222 findDefer.addErrback(log.err)
225 def findHash_error(self, failure, req, url, d):
226 """Process the error in hash lookup by returning an empty L{HashObject}."""
228 self.findHash_done(HashObject(), req, url, d)
230 def findHash_done(self, hash, req, url, d):
231 """Use the returned hash to lookup the file in the cache.
233 If the hash was not found, the workflow skips down to download from
234 the mirror (L{lookupHash_done}).
236 @type hash: L{Hash.HashObject}
237 @param hash: the hash object containing the expected hash for the file
239 if hash.expected() is None:
240 log.msg('Hash for %s was not found' % url)
241 self.lookupHash_done([], hash, url, d)
243 log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
245 # Lookup hash in cache
246 locations = self.db.lookupHash(hash.expected(), filesOnly = True)
247 self.getCachedFile(hash, req, url, d, locations)
249 def getCachedFile(self, hash, req, url, d, locations):
250 """Try to return the file from the cache, otherwise move on to a DHT lookup.
252 @type locations: C{list} of C{dictionary}
253 @param locations: the files in the cache that match the hash,
254 the dictionary contains a key 'path' whose value is a
255 L{twisted.python.filepath.FilePath} object for the file.
258 log.msg('Failed to return file from cache: %s' % url)
259 self.lookupHash(hash, url, d)
262 # Get the first possible location from the list
263 file = locations.pop(0)['path']
264 log.msg('Returning cached file: %s' % file.path)
267 resp = static.File(file.path).renderHTTP(req)
268 if isinstance(resp, defer.Deferred):
269 resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
271 self._getCachedFile(resp, hash, req, url, d, locations)
273 def _getCachedFile(self, resp, hash, req, url, d, locations):
274 """Check the returned response to be sure it is valid."""
275 if isinstance(resp, failure.Failure):
276 log.msg('Got error trying to get cached file')
278 # Try the next possible location
279 self.getCachedFile(hash, req, url, d, locations)
282 log.msg('Cached response: %r' % resp)
284 if resp.code >= 200 and resp.code < 400:
287 # Try the next possible location
288 self.getCachedFile(hash, req, url, d, locations)
290 def lookupHash(self, hash, url, d):
291 """Lookup the hash in the DHT."""
292 log.msg('Looking up hash in DHT for file: %s' % url)
293 key = hash.expected()
294 lookupDefer = self.dht.getValue(key)
295 lookupDefer.addBoth(self.lookupHash_done, hash, url, d)
297 def lookupHash_done(self, values, hash, url, d):
298 """Start the download of the file.
300 The download will be from peers if the DHT lookup succeeded, or
301 from the mirror otherwise.
303 @type values: C{list} of C{dictionary}
304 @param values: the returned values from the DHT containing peer
307 if not isinstance(values, list) or not values:
308 if not isinstance(values, list):
309 log.msg('DHT lookup for %s failed with error %r' % (url, values))
311 log.msg('Peers for %s were not found' % url)
312 getDefer = self.peers.get(hash, url)
313 getDefer.addCallback(self.cache.save_file, hash, url)
314 getDefer.addErrback(self.cache.save_error, url)
315 getDefer.addCallbacks(d.callback, d.errback)
317 log.msg('Found peers for %s: %r' % (url, values))
318 # Download from the found peers
319 getDefer = self.peers.get(hash, url, values)
320 getDefer.addCallback(self.check_response, hash, url)
321 getDefer.addCallback(self.cache.save_file, hash, url)
322 getDefer.addErrback(self.cache.save_error, url)
323 getDefer.addCallbacks(d.callback, d.errback)
325 def check_response(self, response, hash, url):
326 """Check the response from peers, and download from the mirror if it is not."""
327 if response.code < 200 or response.code >= 300:
328 log.msg('Download from peers failed, going to direct download: %s' % url)
329 getDefer = self.peers.get(hash, url)
333 def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
334 """Add a newly cached file to the mirror info and/or the DHT.
336 If the file was downloaded, set url to the path it was downloaded for.
337 Doesn't add a file to the DHT unless a hash was found for it
338 (but does add it anyway if forceDHT is True).
340 @type file_path: L{twisted.python.filepath.FilePath}
341 @param file_path: the location of the file in the local cache
342 @type hash: L{Hash.HashObject}
343 @param hash: the original (expected) hash object containing also the
344 hash of the downloaded file
345 @type new_hash: C{boolean}
346 @param new_hash: whether the has was new to this peer, and so should
349 @param url: the URI of the location of the file in the mirror
350 (optional, defaults to not adding the file to the mirror info)
351 @type forceDHT: C{boolean}
352 @param forceDHT: whether to force addition of the file to the DHT
353 even if the hash was not found in a mirror
354 (optional, defaults to False)
357 self.mirrors.updatedFile(url, file_path)
359 if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
360 return self.store(hash)
363 def store(self, hash):
364 """Add a key/value pair for the file to the DHT.
366 Sets the key and value from the hash information, and tries to add
370 value = {'c': self.my_contact}
371 pieces = hash.pieceDigests()
373 # Determine how to store any piece data
376 elif len(pieces) <= DHT_PIECES:
377 # Short enough to be stored with our peer contact info
378 value['t'] = {'t': ''.join(pieces)}
379 elif len(pieces) <= TORRENT_PIECES:
380 # Short enough to be stored in a separate key in the DHT
381 value['h'] = sha.new(''.join(pieces)).digest()
383 # Too long, must be served up by our peer HTTP server
384 value['l'] = sha.new(''.join(pieces)).digest()
386 storeDefer = self.dht.storeValue(key, value)
387 storeDefer.addCallbacks(self.store_done, self.store_error,
388 callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
391 def store_done(self, result, hash):
392 """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
393 log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
394 pieces = hash.pieceDigests()
395 if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
396 # Add the piece data key and value to the DHT
397 key = sha.new(''.join(pieces)).digest()
398 value = {'t': ''.join(pieces)}
400 storeDefer = self.dht.storeValue(key, value)
401 storeDefer.addCallbacks(self.store_torrent_done, self.store_error,
402 callbackArgs = (key, ), errbackArgs = (key, ))
406 def store_torrent_done(self, result, key):
407 """Adding the file to the DHT is complete, and so is the workflow."""
408 log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
411 def store_error(self, err, key):
412 """Adding to the DHT failed."""
413 log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err))