More strict use of errbacks when using deferreds.
[quix0rs-apt-p2p.git] / apt_p2p / apt_p2p.py
1
2 """The main program code.
3
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
5     in the DHT
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
7     in the DHT
8 @var download_dir: the name of the directory to use for downloaded files
9
10 """
11
12 from binascii import b2a_hex
13 from urlparse import urlunparse
14 from urllib import unquote
15 import os, re, sha
16
17 from twisted.internet import defer, reactor
18 from twisted.web2 import server, http, http_headers, static
19 from twisted.python import log, failure
20 from twisted.python.filepath import FilePath
21
22 from interfaces import IDHT, IDHTStats
23 from apt_p2p_conf import config
24 from PeerManager import PeerManager
25 from HTTPServer import TopLevel
26 from MirrorManager import MirrorManager
27 from CacheManager import CacheManager
28 from Hash import HashObject
29 from db import DB
30 from util import findMyIPAddr, compact
31
32 DHT_PIECES = 4
33 TORRENT_PIECES = 70
34
35 download_dir = 'cache'
36
37 class AptP2P:
38     """The main code object that does all of the work.
39     
40     Contains all of the sub-components that do all the low-level work, and
41     coordinates communication between them.
42     
43     @type dhtClass: L{interfaces.IDHT}
44     @ivar dhtClass: the DHT class to use
45     @type cache_dir: L{twisted.python.filepath.FilePath}
46     @ivar cache_dir: the directory to use for storing all files
47     @type db: L{db.DB}
48     @ivar db: the database to use for tracking files and hashes
49     @type dht: L{interfaces.IDHT}
50     @ivar dht: the DHT instance
51     @type http_server: L{HTTPServer.TopLevel}
52     @ivar http_server: the web server that will handle all requests from apt
53         and from other peers
54     @type peers: L{PeerManager.PeerManager}
55     @ivar peers: the manager of all downloads from mirrors and other peers
56     @type mirrors: L{MirrorManager.MirrorManager}
57     @ivar mirrors: the manager of downloaded information about mirrors which
58         can be queried to get hashes from file names
59     @type cache: L{CacheManager.CacheManager}
60     @ivar cache: the manager of all downloaded files
61     @type my_contact: C{string}
62     @ivar my_contact: the 6-byte compact peer representation of this peer's
63         download information (IP address and port)
64     """
65     
66     def __init__(self, dhtClass):
67         """Initialize all the sub-components.
68         
69         @type dhtClass: L{interfaces.IDHT}
70         @param dhtClass: the DHT class to use
71         """
72         log.msg('Initializing the main apt_p2p application')
73         self.dhtClass = dhtClass
74         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
75         if not self.cache_dir.child(download_dir).exists():
76             self.cache_dir.child(download_dir).makedirs()
77         self.db = DB(self.cache_dir.child('apt-p2p.db'))
78         self.dht = dhtClass()
79         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
80         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
81         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
82         self.getHTTPFactory = self.http_server.getHTTPFactory
83         self.peers = PeerManager(self.cache_dir, self.dht)
84         self.mirrors = MirrorManager(self.cache_dir)
85         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
86         self.my_contact = None
87
88     #{ DHT maintenance
89     def joinComplete(self, result):
90         """Complete the DHT join process and determine our download information.
91         
92         Called by the DHT when the join has been completed with information
93         on the external IP address and port of this peer.
94         """
95         my_addr = findMyIPAddr(result,
96                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
97                                config.getboolean('DEFAULT', 'LOCAL_OK'))
98         if not my_addr:
99             raise RuntimeError, "IP address for this machine could not be found"
100         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
101         self.cache.scanDirectories()
102         reactor.callLater(60, self.refreshFiles)
103
104     def joinError(self, failure):
105         """Joining the DHT has failed."""
106         log.msg("joining DHT failed miserably")
107         log.err(failure)
108         raise RuntimeError, "IP address for this machine could not be found"
109     
110     def refreshFiles(self):
111         """Refresh any files in the DHT that are about to expire."""
112         expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
113         hashes = self.db.expiredHashes(expireAfter)
114         if len(hashes.keys()) > 0:
115             log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
116         self._refreshFiles(None, hashes)
117         
118     def _refreshFiles(self, result, hashes):
119         if result is not None:
120             log.msg('Storage resulted in: %r' % result)
121
122         if hashes:
123             raw_hash = hashes.keys()[0]
124             self.db.refreshHash(raw_hash)
125             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
126             del hashes[raw_hash]
127             storeDefer = self.store(hash)
128             storeDefer.addBoth(self._refreshFiles, hashes)
129         else:
130             reactor.callLater(60, self.refreshFiles)
131     
132     def getStats(self):
133         """Retrieve and format the statistics for the program.
134         
135         @rtype: C{string}
136         @return: the formatted HTML page containing the statistics
137         """
138         out = '<html><body>\n\n'
139         if IDHTStats.implementedBy(self.dhtClass):
140             out += self.dht.getStats()
141         out += '\n</body></html>\n'
142         return out
143
144     #{ Main workflow
145     def check_freshness(self, req, url, modtime, resp):
146         """Send a HEAD to the mirror to check if the response from the cache is still valid.
147         
148         @type req: L{twisted.web2.http.Request}
149         @param req: the initial request sent to the HTTP server by apt
150         @param url: the URI of the actual mirror request
151         @type modtime: C{int}
152         @param modtime: the modified time of the cached file (seconds since epoch)
153         @type resp: L{twisted.web2.http.Response}
154         @param resp: the response from the cache to be sent to apt
155         @rtype: L{twisted.internet.defer.Deferred}
156         @return: a deferred that will be called back with the correct response
157         """
158         log.msg('Checking if %s is still fresh' % url)
159         d = self.peers.get('', url, method = "HEAD", modtime = modtime)
160         d.addCallbacks(self.check_freshness_done, self.check_freshness_error,
161                        callbackArgs = (req, url, resp), errbackArgs = (req, url))
162         return d
163     
164     def check_freshness_done(self, resp, req, url, orig_resp):
165         """Process the returned response from the mirror.
166         
167         @type resp: L{twisted.web2.http.Response}
168         @param resp: the response from the mirror to the HEAD request
169         @type req: L{twisted.web2.http.Request}
170         @param req: the initial request sent to the HTTP server by apt
171         @param url: the URI of the actual mirror request
172         @type orig_resp: L{twisted.web2.http.Response}
173         @param orig_resp: the response from the cache to be sent to apt
174         """
175         if resp.code == 304:
176             log.msg('Still fresh, returning: %s' % url)
177             return orig_resp
178         else:
179             log.msg('Stale, need to redownload: %s' % url)
180             return self.get_resp(req, url)
181     
182     def check_freshness_error(self, err, req, url):
183         """Mirror request failed, continue with download.
184         
185         @param err: the response from the mirror to the HEAD request
186         @type req: L{twisted.web2.http.Request}
187         @param req: the initial request sent to the HTTP server by apt
188         @param url: the URI of the actual mirror request
189         """
190         log.err(err)
191         return self.get_resp(req, url)
192     
193     def get_resp(self, req, url):
194         """Lookup a hash for the file in the local mirror info.
195         
196         Starts the process of getting a response to an uncached apt request.
197         
198         @type req: L{twisted.web2.http.Request}
199         @param req: the initial request sent to the HTTP server by apt
200         @param url: the URI of the actual mirror request
201         @rtype: L{twisted.internet.defer.Deferred}
202         @return: a deferred that will be called back with the response
203         """
204         d = defer.Deferred()
205         
206         log.msg('Trying to find hash for %s' % url)
207         findDefer = self.mirrors.findHash(unquote(url))
208         
209         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
210                                callbackArgs=(req, url, d), errbackArgs=(req, url, d))
211         findDefer.addErrback(log.err)
212         return d
213     
214     def findHash_error(self, failure, req, url, d):
215         """Process the error in hash lookup by returning an empty L{HashObject}."""
216         log.err(failure)
217         self.findHash_done(HashObject(), req, url, d)
218         
219     def findHash_done(self, hash, req, url, d):
220         """Use the returned hash to lookup  the file in the cache.
221         
222         If the hash was not found, the workflow skips down to download from
223         the mirror (L{lookupHash_done}).
224         
225         @type hash: L{Hash.HashObject}
226         @param hash: the hash object containing the expected hash for the file
227         """
228         if hash.expected() is None:
229             log.msg('Hash for %s was not found' % url)
230             self.lookupHash_done([], hash, url, d)
231         else:
232             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
233             
234             # Lookup hash in cache
235             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
236             self.getCachedFile(hash, req, url, d, locations)
237
238     def getCachedFile(self, hash, req, url, d, locations):
239         """Try to return the file from the cache, otherwise move on to a DHT lookup.
240         
241         @type locations: C{list} of C{dictionary}
242         @param locations: the files in the cache that match the hash,
243             the dictionary contains a key 'path' whose value is a
244             L{twisted.python.filepath.FilePath} object for the file.
245         """
246         if not locations:
247             log.msg('Failed to return file from cache: %s' % url)
248             self.lookupHash(hash, url, d)
249             return
250         
251         # Get the first possible location from the list
252         file = locations.pop(0)['path']
253         log.msg('Returning cached file: %s' % file.path)
254         
255         # Get it's response
256         resp = static.File(file.path).renderHTTP(req)
257         if isinstance(resp, defer.Deferred):
258             resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
259         else:
260             self._getCachedFile(resp, hash, req, url, d, locations)
261         
262     def _getCachedFile(self, resp, hash, req, url, d, locations):
263         """Check the returned response to be sure it is valid."""
264         if isinstance(resp, failure.Failure):
265             log.msg('Got error trying to get cached file')
266             log.err()
267             # Try the next possible location
268             self.getCachedFile(hash, req, url, d, locations)
269             return
270             
271         log.msg('Cached response: %r' % resp)
272         
273         if resp.code >= 200 and resp.code < 400:
274             d.callback(resp)
275         else:
276             # Try the next possible location
277             self.getCachedFile(hash, req, url, d, locations)
278
279     def lookupHash(self, hash, url, d):
280         """Lookup the hash in the DHT."""
281         log.msg('Looking up hash in DHT for file: %s' % url)
282         key = hash.expected()
283         lookupDefer = self.dht.getValue(key)
284         lookupDefer.addBoth(self.lookupHash_done, hash, url, d)
285
286     def lookupHash_done(self, values, hash, url, d):
287         """Start the download of the file.
288         
289         The download will be from peers if the DHT lookup succeeded, or
290         from the mirror otherwise.
291         
292         @type values: C{list} of C{dictionary}
293         @param values: the returned values from the DHT containing peer
294             download information
295         """
296         if not isinstance(values, list) or not values:
297             if not isinstance(values, list):
298                 log.msg('DHT lookup for %s failed with error %r' % (url, values))
299             else:
300                 log.msg('Peers for %s were not found' % url)
301             getDefer = self.peers.get(hash, url)
302             getDefer.addCallback(self.cache.save_file, hash, url)
303             getDefer.addErrback(self.cache.save_error, url)
304             getDefer.addCallbacks(d.callback, d.errback)
305         else:
306             log.msg('Found peers for %s: %r' % (url, values))
307             # Download from the found peers
308             getDefer = self.peers.get(hash, url, values)
309             getDefer.addCallback(self.check_response, hash, url)
310             getDefer.addCallback(self.cache.save_file, hash, url)
311             getDefer.addErrback(self.cache.save_error, url)
312             getDefer.addCallbacks(d.callback, d.errback)
313             
314     def check_response(self, response, hash, url):
315         """Check the response from peers, and download from the mirror if it is not."""
316         if response.code < 200 or response.code >= 300:
317             log.msg('Download from peers failed, going to direct download: %s' % url)
318             getDefer = self.peers.get(hash, url)
319             return getDefer
320         return response
321         
322     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
323         """Add a newly cached file to the mirror info and/or the DHT.
324         
325         If the file was downloaded, set url to the path it was downloaded for.
326         Doesn't add a file to the DHT unless a hash was found for it
327         (but does add it anyway if forceDHT is True).
328         
329         @type file_path: L{twisted.python.filepath.FilePath}
330         @param file_path: the location of the file in the local cache
331         @type hash: L{Hash.HashObject}
332         @param hash: the original (expected) hash object containing also the
333             hash of the downloaded file
334         @type new_hash: C{boolean}
335         @param new_hash: whether the has was new to this peer, and so should
336             be added to the DHT
337         @type url: C{string}
338         @param url: the URI of the location of the file in the mirror
339             (optional, defaults to not adding the file to the mirror info)
340         @type forceDHT: C{boolean}
341         @param forceDHT: whether to force addition of the file to the DHT
342             even if the hash was not found in a mirror
343             (optional, defaults to False)
344         """
345         if url:
346             self.mirrors.updatedFile(url, file_path)
347         
348         if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
349             return self.store(hash)
350         return None
351             
352     def store(self, hash):
353         """Add a key/value pair for the file to the DHT.
354         
355         Sets the key and value from the hash information, and tries to add
356         it to the DHT.
357         """
358         key = hash.digest()
359         value = {'c': self.my_contact}
360         pieces = hash.pieceDigests()
361         
362         # Determine how to store any piece data
363         if len(pieces) <= 1:
364             pass
365         elif len(pieces) <= DHT_PIECES:
366             # Short enough to be stored with our peer contact info
367             value['t'] = {'t': ''.join(pieces)}
368         elif len(pieces) <= TORRENT_PIECES:
369             # Short enough to be stored in a separate key in the DHT
370             value['h'] = sha.new(''.join(pieces)).digest()
371         else:
372             # Too long, must be served up by our peer HTTP server
373             value['l'] = sha.new(''.join(pieces)).digest()
374
375         storeDefer = self.dht.storeValue(key, value)
376         storeDefer.addCallbacks(self.store_done, self.store_error,
377                                 callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
378         return storeDefer
379
380     def store_done(self, result, hash):
381         """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
382         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
383         pieces = hash.pieceDigests()
384         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
385             # Add the piece data key and value to the DHT
386             key = sha.new(''.join(pieces)).digest()
387             value = {'t': ''.join(pieces)}
388
389             storeDefer = self.dht.storeValue(key, value)
390             storeDefer.addCallbacks(self.store_torrent_done, self.store_error,
391                                     callbackArgs = (key, ), errbackArgs = (key, ))
392             return storeDefer
393         return result
394
395     def store_torrent_done(self, result, key):
396         """Adding the file to the DHT is complete, and so is the workflow."""
397         log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
398         return result
399
400     def store_error(self, err, key):
401         """Adding to the DHT failed."""
402         log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err))
403         return err
404