Try to rejoin DHT periodically after failures using exponential backoff.
[quix0rs-apt-p2p.git] / apt_p2p / apt_p2p.py
1
2 """The main program code.
3
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
5     in the DHT
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
7     in the DHT
8 @var download_dir: the name of the directory to use for downloaded files
9
10 """
11
12 from binascii import b2a_hex
13 from urlparse import urlunparse
14 from urllib import unquote
15 import os, re, sha
16
17 from twisted.internet import defer, reactor
18 from twisted.web2 import server, http, http_headers, static
19 from twisted.python import log, failure
20 from twisted.python.filepath import FilePath
21
22 from interfaces import IDHT, IDHTStats
23 from apt_p2p_conf import config
24 from PeerManager import PeerManager
25 from HTTPServer import TopLevel
26 from MirrorManager import MirrorManager
27 from CacheManager import CacheManager
28 from Hash import HashObject
29 from db import DB
30 from util import findMyIPAddr, compact
31
32 DHT_PIECES = 4
33 TORRENT_PIECES = 70
34
35 download_dir = 'cache'
36
37 class AptP2P:
38     """The main code object that does all of the work.
39     
40     Contains all of the sub-components that do all the low-level work, and
41     coordinates communication between them.
42     
43     @type dhtClass: L{interfaces.IDHT}
44     @ivar dhtClass: the DHT class to use
45     @type cache_dir: L{twisted.python.filepath.FilePath}
46     @ivar cache_dir: the directory to use for storing all files
47     @type db: L{db.DB}
48     @ivar db: the database to use for tracking files and hashes
49     @type dht: L{interfaces.IDHT}
50     @ivar dht: the DHT instance
51     @type http_server: L{HTTPServer.TopLevel}
52     @ivar http_server: the web server that will handle all requests from apt
53         and from other peers
54     @type peers: L{PeerManager.PeerManager}
55     @ivar peers: the manager of all downloads from mirrors and other peers
56     @type mirrors: L{MirrorManager.MirrorManager}
57     @ivar mirrors: the manager of downloaded information about mirrors which
58         can be queried to get hashes from file names
59     @type cache: L{CacheManager.CacheManager}
60     @ivar cache: the manager of all downloaded files
61     @type my_contact: C{string}
62     @ivar my_contact: the 6-byte compact peer representation of this peer's
63         download information (IP address and port)
64     """
65     
66     def __init__(self, dhtClass):
67         """Initialize all the sub-components.
68         
69         @type dhtClass: L{interfaces.IDHT}
70         @param dhtClass: the DHT class to use
71         """
72         log.msg('Initializing the main apt_p2p application')
73         self.dhtClass = dhtClass
74         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
75         if not self.cache_dir.child(download_dir).exists():
76             self.cache_dir.child(download_dir).makedirs()
77         self.db = DB(self.cache_dir.child('apt-p2p.db'))
78         self.dht = dhtClass()
79         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
80         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
81         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
82         self.getHTTPFactory = self.http_server.getHTTPFactory
83         self.peers = PeerManager(self.cache_dir, self.dht)
84         self.mirrors = MirrorManager(self.cache_dir)
85         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
86         self.my_contact = None
87
88     #{ DHT maintenance
89     def joinComplete(self, result):
90         """Complete the DHT join process and determine our download information.
91         
92         Called by the DHT when the join has been completed with information
93         on the external IP address and port of this peer.
94         """
95         my_addr = findMyIPAddr(result,
96                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
97                                config.getboolean('DEFAULT', 'LOCAL_OK'))
98         if not my_addr:
99             raise RuntimeError, "IP address for this machine could not be found"
100         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
101         self.cache.scanDirectories()
102         reactor.callLater(60, self.refreshFiles)
103
104     def joinError(self, failure):
105         """Joining the DHT has failed."""
106         log.msg("joining DHT failed miserably")
107         log.err(failure)
108         raise RuntimeError, "IP address for this machine could not be found"
109     
110     def refreshFiles(self):
111         """Refresh any files in the DHT that are about to expire."""
112         expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
113         hashes = self.db.expiredHashes(expireAfter)
114         if len(hashes.keys()) > 0:
115             log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
116         self._refreshFiles(None, hashes)
117         
118     def _refreshFiles(self, result, hashes):
119         if result is not None:
120             log.msg('Storage resulted in: %r' % result)
121
122         if hashes:
123             raw_hash = hashes.keys()[0]
124             self.db.refreshHash(raw_hash)
125             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
126             del hashes[raw_hash]
127             storeDefer = self.store(hash)
128             storeDefer.addBoth(self._refreshFiles, hashes)
129         else:
130             reactor.callLater(60, self.refreshFiles)
131     
132     def getStats(self):
133         """Retrieve and format the statistics for the program.
134         
135         @rtype: C{string}
136         @return: the formatted HTML page containing the statistics
137         """
138         out = '<html><body>\n\n'
139         if IDHTStats.implementedBy(self.dhtClass):
140             out += self.dht.getStats()
141         out += '\n</body></html>\n'
142         return out
143
144     #{ Main workflow
145     def check_freshness(self, req, url, modtime, resp):
146         """Send a HEAD to the mirror to check if the response from the cache is still valid.
147         
148         @type req: L{twisted.web2.http.Request}
149         @param req: the initial request sent to the HTTP server by apt
150         @param url: the URI of the actual mirror request
151         @type modtime: C{int}
152         @param modtime: the modified time of the cached file (seconds since epoch)
153         @type resp: L{twisted.web2.http.Response}
154         @param resp: the response from the cache to be sent to apt
155         @rtype: L{twisted.internet.defer.Deferred}
156         @return: a deferred that will be called back with the correct response
157         """
158         log.msg('Checking if %s is still fresh' % url)
159         d = self.peers.get('', url, method = "HEAD", modtime = modtime)
160         d.addCallback(self.check_freshness_done, req, url, resp)
161         return d
162     
163     def check_freshness_done(self, resp, req, url, orig_resp):
164         """Process the returned response from the mirror.
165         
166         @type resp: L{twisted.web2.http.Response}
167         @param resp: the response from the mirror to the HEAD request
168         @type req: L{twisted.web2.http.Request}
169         @param req: the initial request sent to the HTTP server by apt
170         @param url: the URI of the actual mirror request
171         @type orig_resp: L{twisted.web2.http.Response}
172         @param orig_resp: the response from the cache to be sent to apt
173         """
174         if resp.code == 304:
175             log.msg('Still fresh, returning: %s' % url)
176             return orig_resp
177         else:
178             log.msg('Stale, need to redownload: %s' % url)
179             return self.get_resp(req, url)
180     
181     def get_resp(self, req, url):
182         """Lookup a hash for the file in the local mirror info.
183         
184         Starts the process of getting a response to an uncached apt request.
185         
186         @type req: L{twisted.web2.http.Request}
187         @param req: the initial request sent to the HTTP server by apt
188         @param url: the URI of the actual mirror request
189         @rtype: L{twisted.internet.defer.Deferred}
190         @return: a deferred that will be called back with the response
191         """
192         d = defer.Deferred()
193         
194         log.msg('Trying to find hash for %s' % url)
195         findDefer = self.mirrors.findHash(unquote(url))
196         
197         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
198                                callbackArgs=(req, url, d), errbackArgs=(req, url, d))
199         findDefer.addErrback(log.err)
200         return d
201     
202     def findHash_error(self, failure, req, url, d):
203         """Process the error in hash lookup by returning an empty L{HashObject}."""
204         log.err(failure)
205         self.findHash_done(HashObject(), req, url, d)
206         
207     def findHash_done(self, hash, req, url, d):
208         """Use the returned hash to lookup  the file in the cache.
209         
210         If the hash was not found, the workflow skips down to download from
211         the mirror (L{lookupHash_done}).
212         
213         @type hash: L{Hash.HashObject}
214         @param hash: the hash object containing the expected hash for the file
215         """
216         if hash.expected() is None:
217             log.msg('Hash for %s was not found' % url)
218             self.lookupHash_done([], hash, url, d)
219         else:
220             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
221             
222             # Lookup hash in cache
223             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
224             self.getCachedFile(hash, req, url, d, locations)
225
226     def getCachedFile(self, hash, req, url, d, locations):
227         """Try to return the file from the cache, otherwise move on to a DHT lookup.
228         
229         @type locations: C{list} of C{dictionary}
230         @param locations: the files in the cache that match the hash,
231             the dictionary contains a key 'path' whose value is a
232             L{twisted.python.filepath.FilePath} object for the file.
233         """
234         if not locations:
235             log.msg('Failed to return file from cache: %s' % url)
236             self.lookupHash(hash, url, d)
237             return
238         
239         # Get the first possible location from the list
240         file = locations.pop(0)['path']
241         log.msg('Returning cached file: %s' % file.path)
242         
243         # Get it's response
244         resp = static.File(file.path).renderHTTP(req)
245         if isinstance(resp, defer.Deferred):
246             resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
247         else:
248             self._getCachedFile(resp, hash, req, url, d, locations)
249         
250     def _getCachedFile(self, resp, hash, req, url, d, locations):
251         """Check the returned response to be sure it is valid."""
252         if isinstance(resp, failure.Failure):
253             log.msg('Got error trying to get cached file')
254             log.err()
255             # Try the next possible location
256             self.getCachedFile(hash, req, url, d, locations)
257             return
258             
259         log.msg('Cached response: %r' % resp)
260         
261         if resp.code >= 200 and resp.code < 400:
262             d.callback(resp)
263         else:
264             # Try the next possible location
265             self.getCachedFile(hash, req, url, d, locations)
266
267     def lookupHash(self, hash, url, d):
268         """Lookup the hash in the DHT."""
269         log.msg('Looking up hash in DHT for file: %s' % url)
270         key = hash.expected()
271         lookupDefer = self.dht.getValue(key)
272         lookupDefer.addBoth(self.lookupHash_done, hash, url, d)
273
274     def lookupHash_done(self, values, hash, url, d):
275         """Start the download of the file.
276         
277         The download will be from peers if the DHT lookup succeeded, or
278         from the mirror otherwise.
279         
280         @type values: C{list} of C{dictionary}
281         @param values: the returned values from the DHT containing peer
282             download information
283         """
284         if not isinstance(values, list) or not values:
285             if not isinstance(values, list):
286                 log.msg('DHT lookup for %s failed with error %r' % (url, values))
287             else:
288                 log.msg('Peers for %s were not found' % url)
289             getDefer = self.peers.get(hash, url)
290             getDefer.addCallback(self.cache.save_file, hash, url)
291             getDefer.addErrback(self.cache.save_error, url)
292             getDefer.addCallbacks(d.callback, d.errback)
293         else:
294             log.msg('Found peers for %s: %r' % (url, values))
295             # Download from the found peers
296             getDefer = self.peers.get(hash, url, values)
297             getDefer.addCallback(self.check_response, hash, url)
298             getDefer.addCallback(self.cache.save_file, hash, url)
299             getDefer.addErrback(self.cache.save_error, url)
300             getDefer.addCallbacks(d.callback, d.errback)
301             
302     def check_response(self, response, hash, url):
303         """Check the response from peers, and download from the mirror if it is not."""
304         if response.code < 200 or response.code >= 300:
305             log.msg('Download from peers failed, going to direct download: %s' % url)
306             getDefer = self.peers.get(hash, url)
307             return getDefer
308         return response
309         
310     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
311         """Add a newly cached file to the mirror info and/or the DHT.
312         
313         If the file was downloaded, set url to the path it was downloaded for.
314         Doesn't add a file to the DHT unless a hash was found for it
315         (but does add it anyway if forceDHT is True).
316         
317         @type file_path: L{twisted.python.filepath.FilePath}
318         @param file_path: the location of the file in the local cache
319         @type hash: L{Hash.HashObject}
320         @param hash: the original (expected) hash object containing also the
321             hash of the downloaded file
322         @type new_hash: C{boolean}
323         @param new_hash: whether the has was new to this peer, and so should
324             be added to the DHT
325         @type url: C{string}
326         @param url: the URI of the location of the file in the mirror
327             (optional, defaults to not adding the file to the mirror info)
328         @type forceDHT: C{boolean}
329         @param forceDHT: whether to force addition of the file to the DHT
330             even if the hash was not found in a mirror
331             (optional, defaults to False)
332         """
333         if url:
334             self.mirrors.updatedFile(url, file_path)
335         
336         if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
337             return self.store(hash)
338         return None
339             
340     def store(self, hash):
341         """Add a key/value pair for the file to the DHT.
342         
343         Sets the key and value from the hash information, and tries to add
344         it to the DHT.
345         """
346         key = hash.digest()
347         value = {'c': self.my_contact}
348         pieces = hash.pieceDigests()
349         
350         # Determine how to store any piece data
351         if len(pieces) <= 1:
352             pass
353         elif len(pieces) <= DHT_PIECES:
354             # Short enough to be stored with our peer contact info
355             value['t'] = {'t': ''.join(pieces)}
356         elif len(pieces) <= TORRENT_PIECES:
357             # Short enough to be stored in a separate key in the DHT
358             value['h'] = sha.new(''.join(pieces)).digest()
359         else:
360             # Too long, must be served up by our peer HTTP server
361             value['l'] = sha.new(''.join(pieces)).digest()
362
363         storeDefer = self.dht.storeValue(key, value)
364         storeDefer.addCallbacks(self.store_done, self.store_error,
365                                 callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
366         return storeDefer
367
368     def store_done(self, result, hash):
369         """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
370         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
371         pieces = hash.pieceDigests()
372         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
373             # Add the piece data key and value to the DHT
374             key = sha.new(''.join(pieces)).digest()
375             value = {'t': ''.join(pieces)}
376
377             storeDefer = self.dht.storeValue(key, value)
378             storeDefer.addCallbacks(self.store_torrent_done, self.store_error,
379                                     callbackArgs = (key, ), errbackArgs = (key, ))
380             return storeDefer
381         return result
382
383     def store_torrent_done(self, result, key):
384         """Adding the file to the DHT is complete, and so is the workflow."""
385         log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
386         return result
387
388     def store_error(self, err, key):
389         """Adding to the DHT failed."""
390         log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err))
391         return err
392