Refresh expired DHT hashes concurrently instead of sequentially.
[quix0rs-apt-p2p.git] / apt_p2p / apt_p2p.py
1
2 """The main program code.
3
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
5     in the DHT
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
7     in the DHT
8 @var download_dir: the name of the directory to use for downloaded files
9 @var peer_dir: the name of the directory to use for peer downloads
10 """
11
12 from binascii import b2a_hex
13 from urlparse import urlunparse
14 from urllib import unquote
15 import os, re, sha
16
17 from twisted.internet import defer, reactor, protocol
18 from twisted.web2 import server, http, http_headers, static
19 from twisted.python import log, failure
20 from twisted.python.filepath import FilePath
21
22 from interfaces import IDHT, IDHTStats
23 from apt_p2p_conf import config
24 from PeerManager import PeerManager
25 from HTTPServer import TopLevel
26 from MirrorManager import MirrorManager
27 from CacheManager import CacheManager
28 from Hash import HashObject
29 from db import DB
30 from stats import StatsLogger
31 from util import findMyIPAddr, compact
32
33 DHT_PIECES = 4
34 TORRENT_PIECES = 70
35
36 download_dir = 'cache'
37 peer_dir = 'peers'
38
39 class AptP2P(protocol.Factory):
40     """The main code object that does all of the work.
41     
42     Contains all of the sub-components that do all the low-level work, and
43     coordinates communication between them.
44     
45     @type dhtClass: L{interfaces.IDHT}
46     @ivar dhtClass: the DHT class to use
47     @type cache_dir: L{twisted.python.filepath.FilePath}
48     @ivar cache_dir: the directory to use for storing all files
49     @type db: L{db.DB}
50     @ivar db: the database to use for tracking files and hashes
51     @type dht: L{interfaces.IDHT}
52     @ivar dht: the DHT instance
53     @type stats: L{stats.StatsLogger}
54     @ivar stats: the statistics logger to record sent data to
55     @type http_server: L{HTTPServer.TopLevel}
56     @ivar http_server: the web server that will handle all requests from apt
57         and from other peers
58     @type peers: L{PeerManager.PeerManager}
59     @ivar peers: the manager of all downloads from mirrors and other peers
60     @type mirrors: L{MirrorManager.MirrorManager}
61     @ivar mirrors: the manager of downloaded information about mirrors which
62         can be queried to get hashes from file names
63     @type cache: L{CacheManager.CacheManager}
64     @ivar cache: the manager of all downloaded files
65     @type my_contact: C{string}
66     @ivar my_contact: the 6-byte compact peer representation of this peer's
67         download information (IP address and port)
68     """
69     
70     def __init__(self, dhtClass):
71         """Initialize all the sub-components.
72         
73         @type dhtClass: L{interfaces.IDHT}
74         @param dhtClass: the DHT class to use
75         """
76         log.msg('Initializing the main apt_p2p application')
77         self.dhtClass = dhtClass
78
79     #{ Factory interface
80     def startFactory(self):
81         reactor.callLater(0, self._startFactory)
82         
83     def _startFactory(self):
84         log.msg('Starting the main apt_p2p application')
85         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
86         if not self.cache_dir.child(download_dir).exists():
87             self.cache_dir.child(download_dir).makedirs()
88         if not self.cache_dir.child(peer_dir).exists():
89             self.cache_dir.child(peer_dir).makedirs()
90         self.db = DB(self.cache_dir.child('apt-p2p.db'))
91         self.dht = self.dhtClass()
92         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
93         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
94         self.stats = StatsLogger(self.db)
95         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
96         self.http_server.getHTTPFactory().startFactory()
97         self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats)
98         self.mirrors = MirrorManager(self.cache_dir)
99         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
100         self.my_contact = None
101         
102     def stopFactory(self):
103         log.msg('Stoppping the main apt_p2p application')
104         self.http_server.getHTTPFactory().stopFactory()
105         self.mirrors.cleanup()
106         self.stats.save()
107         self.db.close()
108     
109     def buildProtocol(self, addr):
110         return self.http_server.getHTTPFactory().buildProtocol(addr)
111         
112     #{ DHT Maintenance
113     def joinComplete(self, result):
114         """Complete the DHT join process and determine our download information.
115         
116         Called by the DHT when the join has been completed with information
117         on the external IP address and port of this peer.
118         """
119         my_addr = findMyIPAddr(result,
120                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
121                                config.getboolean('DEFAULT', 'LOCAL_OK'))
122         if not my_addr:
123             raise RuntimeError, "IP address for this machine could not be found"
124         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
125         self.cache.scanDirectories()
126         self.nextRefresh = reactor.callLater(60, self.refreshFiles)
127
128     def joinError(self, failure):
129         """Joining the DHT has failed."""
130         log.msg("joining DHT failed miserably")
131         log.err(failure)
132         raise RuntimeError, "IP address for this machine could not be found"
133     
134     def refreshFiles(self, result = None, hashes = {}):
135         """Refresh any files in the DHT that are about to expire."""
136         if result is not None:
137             log.msg('Storage resulted in: %r' % result)
138
139         if not hashes:
140             expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
141             hashes = self.db.expiredHashes(expireAfter)
142             if len(hashes.keys()) > 0:
143                 log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
144
145         delay = 60
146         if hashes:
147             delay = 3
148             raw_hash = hashes.keys()[0]
149             self.db.refreshHash(raw_hash)
150             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
151             del hashes[raw_hash]
152             storeDefer = self.store(hash)
153             storeDefer.addBoth(self.refreshFiles, hashes)
154
155         if self.nextRefresh.active():
156             self.nextRefresh.reset(delay)
157         else:
158             self.nextRefresh = reactor.callLater(delay, self.plRefresh, None, hashes)
159     
160     def getStats(self):
161         """Retrieve and format the statistics for the program.
162         
163         @rtype: C{string}
164         @return: the formatted HTML page containing the statistics
165         """
166         out = '<html><body>\n\n'
167         out += self.stats.formatHTML(self.my_contact)
168         out += '\n\n'
169         if IDHTStats.implementedBy(self.dhtClass):
170             out += self.dht.getStats()
171         out += '\n</body></html>\n'
172         return out
173
174     #{ Main workflow
175     def get_resp(self, req, url, orig_resp = None):
176         """Lookup a hash for the file in the local mirror info.
177         
178         Starts the process of getting a response to an apt request.
179         
180         @type req: L{twisted.web2.http.Request}
181         @param req: the initial request sent to the HTTP server by apt
182         @param url: the URI of the actual mirror request
183         @type orig_resp: L{twisted.web2.http.Response}
184         @param orig_resp: the response from the cache to be sent to apt
185             (optional, ignored if missing)
186         @rtype: L{twisted.internet.defer.Deferred}
187         @return: a deferred that will be called back with the response
188         """
189         d = defer.Deferred()
190         
191         log.msg('Trying to find hash for %s' % url)
192         findDefer = self.mirrors.findHash(unquote(url))
193         
194         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
195                                callbackArgs=(req, url, orig_resp, d),
196                                errbackArgs=(req, url, orig_resp, d))
197         return d
198     
199     def findHash_error(self, failure, req, url, orig_resp, d):
200         """Process the error in hash lookup by returning an empty L{HashObject}."""
201         log.msg('Hash lookup for %s resulted in an error: %s' %
202                 (url, failure.getErrorMessage()))
203         self.findHash_done(HashObject(), req, url, orig_resp, d)
204         
205     def findHash_done(self, hash, req, url, orig_resp, d):
206         """Use the returned hash to lookup the file in the cache.
207         
208         If the hash was not found, the workflow skips down to download from
209         the mirror (L{startDownload}), or checks the freshness of an old
210         response if there is one.
211         
212         @type hash: L{Hash.HashObject}
213         @param hash: the hash object containing the expected hash for the file
214         """
215         if hash.expected() is None:
216             log.msg('Hash for %s was not found' % url)
217             # Send the old response or get a new one
218             if orig_resp:
219                 self.check_freshness(req, url, orig_resp, d)
220             else:
221                 self.startDownload([], req, hash, url, d)
222         else:
223             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
224             
225             # Lookup hash in cache
226             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
227             self.getCachedFile(hash, req, url, d, locations)
228
229     def check_freshness(self, req, url, orig_resp, d):
230         """Send a HEAD to the mirror to check if the response from the cache is still valid.
231         
232         @type req: L{twisted.web2.http.Request}
233         @param req: the initial request sent to the HTTP server by apt
234         @param url: the URI of the actual mirror request
235         @type orig_resp: L{twisted.web2.http.Response}
236         @param orig_resp: the response from the cache to be sent to apt
237         @rtype: L{twisted.internet.defer.Deferred}
238         @return: a deferred that will be called back with the correct response
239         """
240         log.msg('Checking if %s is still fresh' % url)
241         modtime = orig_resp.headers.getHeader('Last-Modified')
242         headDefer = self.peers.get(HashObject(), url, method = "HEAD",
243                                    modtime = modtime)
244         headDefer.addCallbacks(self.check_freshness_done,
245                                self.check_freshness_error,
246                                callbackArgs = (req, url, orig_resp, d),
247                                errbackArgs = (req, url, d))
248     
249     def check_freshness_done(self, resp, req, url, orig_resp, d):
250         """Return the fresh response, if stale start to redownload.
251         
252         @type resp: L{twisted.web2.http.Response}
253         @param resp: the response from the mirror to the HEAD request
254         @type req: L{twisted.web2.http.Request}
255         @param req: the initial request sent to the HTTP server by apt
256         @param url: the URI of the actual mirror request
257         @type orig_resp: L{twisted.web2.http.Response}
258         @param orig_resp: the response from the cache to be sent to apt
259         """
260         if resp.code == 304:
261             log.msg('Still fresh, returning: %s' % url)
262             d.callback(orig_resp)
263         else:
264             log.msg('Stale, need to redownload: %s' % url)
265             self.startDownload([], req, HashObject(), url, d)
266     
267     def check_freshness_error(self, err, req, url, d):
268         """Mirror request failed, continue with download.
269         
270         @param err: the response from the mirror to the HEAD request
271         @type req: L{twisted.web2.http.Request}
272         @param req: the initial request sent to the HTTP server by apt
273         @param url: the URI of the actual mirror request
274         """
275         log.err(err)
276         self.startDownload([], req, HashObject(), url, d)
277     
278     def getCachedFile(self, hash, req, url, d, locations):
279         """Try to return the file from the cache, otherwise move on to a DHT lookup.
280         
281         @type locations: C{list} of C{dictionary}
282         @param locations: the files in the cache that match the hash,
283             the dictionary contains a key 'path' whose value is a
284             L{twisted.python.filepath.FilePath} object for the file.
285         """
286         if not locations:
287             log.msg('Failed to return file from cache: %s' % url)
288             self.lookupHash(req, hash, url, d)
289             return
290         
291         # Get the first possible location from the list
292         file = locations.pop(0)['path']
293         log.msg('Returning cached file: %s' % file.path)
294         
295         # Get it's response
296         resp = static.File(file.path).renderHTTP(req)
297         if isinstance(resp, defer.Deferred):
298             resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
299         else:
300             self._getCachedFile(resp, hash, req, url, d, locations)
301         
302     def _getCachedFile(self, resp, hash, req, url, d, locations):
303         """Check the returned response to be sure it is valid."""
304         if isinstance(resp, failure.Failure):
305             log.msg('Got error trying to get cached file')
306             log.err(resp)
307             # Try the next possible location
308             self.getCachedFile(hash, req, url, d, locations)
309             return
310             
311         log.msg('Cached response: %r' % resp)
312         
313         if resp.code >= 200 and resp.code < 400:
314             d.callback(resp)
315         else:
316             # Try the next possible location
317             self.getCachedFile(hash, req, url, d, locations)
318
319     def lookupHash(self, req, hash, url, d):
320         """Lookup the hash in the DHT."""
321         log.msg('Looking up hash in DHT for file: %s' % url)
322         key = hash.expected()
323         lookupDefer = self.dht.getValue(key)
324         lookupDefer.addBoth(self.startDownload, req, hash, url, d)
325
326     def startDownload(self, values, req, hash, url, d):
327         """Start the download of the file.
328         
329         The download will be from peers if the DHT lookup succeeded, or
330         from the mirror otherwise.
331         
332         @type values: C{list} of C{dictionary}
333         @param values: the returned values from the DHT containing peer
334             download information
335         """
336         # Remove some headers Apt sets in the request
337         req.headers.removeHeader('If-Modified-Since')
338         req.headers.removeHeader('Range')
339         req.headers.removeHeader('If-Range')
340         
341         if not isinstance(values, list) or not values:
342             if not isinstance(values, list):
343                 log.msg('DHT lookup for %s failed with error %r' % (url, values))
344             else:
345                 log.msg('Peers for %s were not found' % url)
346             getDefer = self.peers.get(hash, url)
347             getDefer.addCallback(self.cache.save_file, hash, url)
348             getDefer.addErrback(self.cache.save_error, url)
349             getDefer.addCallbacks(d.callback, d.errback)
350         else:
351             log.msg('Found peers for %s: %r' % (url, values))
352             # Download from the found peers
353             getDefer = self.peers.get(hash, url, values)
354             getDefer.addCallback(self.check_response, hash, url)
355             getDefer.addCallback(self.cache.save_file, hash, url)
356             getDefer.addErrback(self.cache.save_error, url)
357             getDefer.addCallbacks(d.callback, d.errback)
358             
359     def check_response(self, response, hash, url):
360         """Check the response from peers, and download from the mirror if it is not."""
361         if response.code < 200 or response.code >= 300:
362             log.msg('Download from peers failed, going to direct download: %s' % url)
363             getDefer = self.peers.get(hash, url)
364             return getDefer
365         return response
366         
367     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
368         """Add a newly cached file to the mirror info and/or the DHT.
369         
370         If the file was downloaded, set url to the path it was downloaded for.
371         Doesn't add a file to the DHT unless a hash was found for it
372         (but does add it anyway if forceDHT is True).
373         
374         @type file_path: L{twisted.python.filepath.FilePath}
375         @param file_path: the location of the file in the local cache
376         @type hash: L{Hash.HashObject}
377         @param hash: the original (expected) hash object containing also the
378             hash of the downloaded file
379         @type new_hash: C{boolean}
380         @param new_hash: whether the has was new to this peer, and so should
381             be added to the DHT
382         @type url: C{string}
383         @param url: the URI of the location of the file in the mirror
384             (optional, defaults to not adding the file to the mirror info)
385         @type forceDHT: C{boolean}
386         @param forceDHT: whether to force addition of the file to the DHT
387             even if the hash was not found in a mirror
388             (optional, defaults to False)
389         """
390         if url:
391             self.mirrors.updatedFile(url, file_path)
392         
393         if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
394             return self.store(hash)
395         return None
396             
397     def store(self, hash):
398         """Add a key/value pair for the file to the DHT.
399         
400         Sets the key and value from the hash information, and tries to add
401         it to the DHT.
402         """
403         key = hash.digest()
404         value = {'c': self.my_contact}
405         pieces = hash.pieceDigests()
406         
407         # Determine how to store any piece data
408         if len(pieces) <= 1:
409             pass
410         elif len(pieces) <= DHT_PIECES:
411             # Short enough to be stored with our peer contact info
412             value['t'] = {'t': ''.join(pieces)}
413         elif len(pieces) <= TORRENT_PIECES:
414             # Short enough to be stored in a separate key in the DHT
415             value['h'] = sha.new(''.join(pieces)).digest()
416         else:
417             # Too long, must be served up by our peer HTTP server
418             value['l'] = sha.new(''.join(pieces)).digest()
419
420         storeDefer = self.dht.storeValue(key, value)
421         storeDefer.addCallbacks(self.store_done, self.store_error,
422                                 callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
423         return storeDefer
424
425     def store_done(self, result, hash):
426         """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
427         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
428         pieces = hash.pieceDigests()
429         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
430             # Add the piece data key and value to the DHT
431             key = sha.new(''.join(pieces)).digest()
432             value = {'t': ''.join(pieces)}
433
434             storeDefer = self.dht.storeValue(key, value)
435             storeDefer.addCallbacks(self.store_torrent_done, self.store_error,
436                                     callbackArgs = (key, ), errbackArgs = (key, ))
437             return storeDefer
438         return result
439
440     def store_torrent_done(self, result, key):
441         """Adding the file to the DHT is complete, and so is the workflow."""
442         log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
443         return result
444
445     def store_error(self, err, key):
446         """Adding to the DHT failed."""
447         log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err))
448         return err
449