f88fd791a184fde37646501ea2fb941eb17e70fb
[quix0rs-apt-p2p.git] / apt_p2p / apt_p2p.py
1
2 """The main program code.
3
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
5     in the DHT
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
7     in the DHT
8 @var download_dir: the name of the directory to use for downloaded files
9
10 """
11
12 from binascii import b2a_hex
13 from urlparse import urlunparse
14 from urllib import unquote
15 import os, re, sha
16
17 from twisted.internet import defer, reactor
18 from twisted.web2 import server, http, http_headers, static
19 from twisted.python import log, failure
20 from twisted.python.filepath import FilePath
21
22 from interfaces import IDHT, IDHTStats
23 from apt_p2p_conf import config
24 from PeerManager import PeerManager
25 from HTTPServer import TopLevel
26 from MirrorManager import MirrorManager
27 from CacheManager import CacheManager
28 from Hash import HashObject
29 from db import DB
30 from stats import StatsLogger
31 from util import findMyIPAddr, compact
32
33 DHT_PIECES = 4
34 TORRENT_PIECES = 70
35
36 download_dir = 'cache'
37
38 class AptP2P:
39     """The main code object that does all of the work.
40     
41     Contains all of the sub-components that do all the low-level work, and
42     coordinates communication between them.
43     
44     @type dhtClass: L{interfaces.IDHT}
45     @ivar dhtClass: the DHT class to use
46     @type cache_dir: L{twisted.python.filepath.FilePath}
47     @ivar cache_dir: the directory to use for storing all files
48     @type db: L{db.DB}
49     @ivar db: the database to use for tracking files and hashes
50     @type dht: L{interfaces.IDHT}
51     @ivar dht: the DHT instance
52     @type stats: L{stats.StatsLogger}
53     @ivar stats: the statistics logger to record sent data to
54     @type http_server: L{HTTPServer.TopLevel}
55     @ivar http_server: the web server that will handle all requests from apt
56         and from other peers
57     @type peers: L{PeerManager.PeerManager}
58     @ivar peers: the manager of all downloads from mirrors and other peers
59     @type mirrors: L{MirrorManager.MirrorManager}
60     @ivar mirrors: the manager of downloaded information about mirrors which
61         can be queried to get hashes from file names
62     @type cache: L{CacheManager.CacheManager}
63     @ivar cache: the manager of all downloaded files
64     @type my_contact: C{string}
65     @ivar my_contact: the 6-byte compact peer representation of this peer's
66         download information (IP address and port)
67     """
68     
69     def __init__(self, dhtClass):
70         """Initialize all the sub-components.
71         
72         @type dhtClass: L{interfaces.IDHT}
73         @param dhtClass: the DHT class to use
74         """
75         log.msg('Initializing the main apt_p2p application')
76         self.dhtClass = dhtClass
77         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
78         if not self.cache_dir.child(download_dir).exists():
79             self.cache_dir.child(download_dir).makedirs()
80         self.db = DB(self.cache_dir.child('apt-p2p.db'))
81         self.dht = dhtClass()
82         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
83         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
84         self.stats = StatsLogger(self.db)
85         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
86         self.getHTTPFactory = self.http_server.getHTTPFactory
87         self.peers = PeerManager(self.cache_dir, self.dht, self.stats)
88         self.mirrors = MirrorManager(self.cache_dir)
89         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
90         self.my_contact = None
91         reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
92
93     #{ Maintenance
94     def shutdown(self):
95         self.stats.save()
96         self.db.close()
97         
98     def joinComplete(self, result):
99         """Complete the DHT join process and determine our download information.
100         
101         Called by the DHT when the join has been completed with information
102         on the external IP address and port of this peer.
103         """
104         my_addr = findMyIPAddr(result,
105                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
106                                config.getboolean('DEFAULT', 'LOCAL_OK'))
107         if not my_addr:
108             raise RuntimeError, "IP address for this machine could not be found"
109         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
110         self.cache.scanDirectories()
111         reactor.callLater(60, self.refreshFiles)
112
113     def joinError(self, failure):
114         """Joining the DHT has failed."""
115         log.msg("joining DHT failed miserably")
116         log.err(failure)
117         raise RuntimeError, "IP address for this machine could not be found"
118     
119     def refreshFiles(self):
120         """Refresh any files in the DHT that are about to expire."""
121         expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
122         hashes = self.db.expiredHashes(expireAfter)
123         if len(hashes.keys()) > 0:
124             log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
125         self._refreshFiles(None, hashes)
126         
127     def _refreshFiles(self, result, hashes):
128         if result is not None:
129             log.msg('Storage resulted in: %r' % result)
130
131         if hashes:
132             raw_hash = hashes.keys()[0]
133             self.db.refreshHash(raw_hash)
134             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
135             del hashes[raw_hash]
136             storeDefer = self.store(hash)
137             storeDefer.addBoth(self._refreshFiles, hashes)
138         else:
139             reactor.callLater(60, self.refreshFiles)
140     
141     def getStats(self):
142         """Retrieve and format the statistics for the program.
143         
144         @rtype: C{string}
145         @return: the formatted HTML page containing the statistics
146         """
147         out = '<html><body>\n\n'
148         out += self.stats.formatHTML(self.my_contact)
149         out += '\n\n'
150         if IDHTStats.implementedBy(self.dhtClass):
151             out += self.dht.getStats()
152         out += '\n</body></html>\n'
153         return out
154
155     #{ Main workflow
156     def check_freshness(self, req, url, modtime, resp):
157         """Send a HEAD to the mirror to check if the response from the cache is still valid.
158         
159         @type req: L{twisted.web2.http.Request}
160         @param req: the initial request sent to the HTTP server by apt
161         @param url: the URI of the actual mirror request
162         @type modtime: C{int}
163         @param modtime: the modified time of the cached file (seconds since epoch)
164         @type resp: L{twisted.web2.http.Response}
165         @param resp: the response from the cache to be sent to apt
166         @rtype: L{twisted.internet.defer.Deferred}
167         @return: a deferred that will be called back with the correct response
168         """
169         log.msg('Checking if %s is still fresh' % url)
170         d = self.peers.get('', url, method = "HEAD", modtime = modtime)
171         d.addCallbacks(self.check_freshness_done, self.check_freshness_error,
172                        callbackArgs = (req, url, resp), errbackArgs = (req, url))
173         return d
174     
175     def check_freshness_done(self, resp, req, url, orig_resp):
176         """Process the returned response from the mirror.
177         
178         @type resp: L{twisted.web2.http.Response}
179         @param resp: the response from the mirror to the HEAD request
180         @type req: L{twisted.web2.http.Request}
181         @param req: the initial request sent to the HTTP server by apt
182         @param url: the URI of the actual mirror request
183         @type orig_resp: L{twisted.web2.http.Response}
184         @param orig_resp: the response from the cache to be sent to apt
185         """
186         if resp.code == 304:
187             log.msg('Still fresh, returning: %s' % url)
188             return orig_resp
189         else:
190             log.msg('Stale, need to redownload: %s' % url)
191             return self.get_resp(req, url)
192     
193     def check_freshness_error(self, err, req, url):
194         """Mirror request failed, continue with download.
195         
196         @param err: the response from the mirror to the HEAD request
197         @type req: L{twisted.web2.http.Request}
198         @param req: the initial request sent to the HTTP server by apt
199         @param url: the URI of the actual mirror request
200         """
201         log.err(err)
202         return self.get_resp(req, url)
203     
204     def get_resp(self, req, url):
205         """Lookup a hash for the file in the local mirror info.
206         
207         Starts the process of getting a response to an uncached apt request.
208         
209         @type req: L{twisted.web2.http.Request}
210         @param req: the initial request sent to the HTTP server by apt
211         @param url: the URI of the actual mirror request
212         @rtype: L{twisted.internet.defer.Deferred}
213         @return: a deferred that will be called back with the response
214         """
215         d = defer.Deferred()
216         
217         log.msg('Trying to find hash for %s' % url)
218         findDefer = self.mirrors.findHash(unquote(url))
219         
220         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
221                                callbackArgs=(req, url, d), errbackArgs=(req, url, d))
222         findDefer.addErrback(log.err)
223         return d
224     
225     def findHash_error(self, failure, req, url, d):
226         """Process the error in hash lookup by returning an empty L{HashObject}."""
227         log.err(failure)
228         self.findHash_done(HashObject(), req, url, d)
229         
230     def findHash_done(self, hash, req, url, d):
231         """Use the returned hash to lookup  the file in the cache.
232         
233         If the hash was not found, the workflow skips down to download from
234         the mirror (L{lookupHash_done}).
235         
236         @type hash: L{Hash.HashObject}
237         @param hash: the hash object containing the expected hash for the file
238         """
239         if hash.expected() is None:
240             log.msg('Hash for %s was not found' % url)
241             self.lookupHash_done([], hash, url, d)
242         else:
243             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
244             
245             # Lookup hash in cache
246             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
247             self.getCachedFile(hash, req, url, d, locations)
248
249     def getCachedFile(self, hash, req, url, d, locations):
250         """Try to return the file from the cache, otherwise move on to a DHT lookup.
251         
252         @type locations: C{list} of C{dictionary}
253         @param locations: the files in the cache that match the hash,
254             the dictionary contains a key 'path' whose value is a
255             L{twisted.python.filepath.FilePath} object for the file.
256         """
257         if not locations:
258             log.msg('Failed to return file from cache: %s' % url)
259             self.lookupHash(hash, url, d)
260             return
261         
262         # Get the first possible location from the list
263         file = locations.pop(0)['path']
264         log.msg('Returning cached file: %s' % file.path)
265         
266         # Get it's response
267         resp = static.File(file.path).renderHTTP(req)
268         if isinstance(resp, defer.Deferred):
269             resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
270         else:
271             self._getCachedFile(resp, hash, req, url, d, locations)
272         
273     def _getCachedFile(self, resp, hash, req, url, d, locations):
274         """Check the returned response to be sure it is valid."""
275         if isinstance(resp, failure.Failure):
276             log.msg('Got error trying to get cached file')
277             log.err()
278             # Try the next possible location
279             self.getCachedFile(hash, req, url, d, locations)
280             return
281             
282         log.msg('Cached response: %r' % resp)
283         
284         if resp.code >= 200 and resp.code < 400:
285             d.callback(resp)
286         else:
287             # Try the next possible location
288             self.getCachedFile(hash, req, url, d, locations)
289
290     def lookupHash(self, hash, url, d):
291         """Lookup the hash in the DHT."""
292         log.msg('Looking up hash in DHT for file: %s' % url)
293         key = hash.expected()
294         lookupDefer = self.dht.getValue(key)
295         lookupDefer.addBoth(self.lookupHash_done, hash, url, d)
296
297     def lookupHash_done(self, values, hash, url, d):
298         """Start the download of the file.
299         
300         The download will be from peers if the DHT lookup succeeded, or
301         from the mirror otherwise.
302         
303         @type values: C{list} of C{dictionary}
304         @param values: the returned values from the DHT containing peer
305             download information
306         """
307         if not isinstance(values, list) or not values:
308             if not isinstance(values, list):
309                 log.msg('DHT lookup for %s failed with error %r' % (url, values))
310             else:
311                 log.msg('Peers for %s were not found' % url)
312             getDefer = self.peers.get(hash, url)
313             getDefer.addCallback(self.cache.save_file, hash, url)
314             getDefer.addErrback(self.cache.save_error, url)
315             getDefer.addCallbacks(d.callback, d.errback)
316         else:
317             log.msg('Found peers for %s: %r' % (url, values))
318             # Download from the found peers
319             getDefer = self.peers.get(hash, url, values)
320             getDefer.addCallback(self.check_response, hash, url)
321             getDefer.addCallback(self.cache.save_file, hash, url)
322             getDefer.addErrback(self.cache.save_error, url)
323             getDefer.addCallbacks(d.callback, d.errback)
324             
325     def check_response(self, response, hash, url):
326         """Check the response from peers, and download from the mirror if it is not."""
327         if response.code < 200 or response.code >= 300:
328             log.msg('Download from peers failed, going to direct download: %s' % url)
329             getDefer = self.peers.get(hash, url)
330             return getDefer
331         return response
332         
333     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
334         """Add a newly cached file to the mirror info and/or the DHT.
335         
336         If the file was downloaded, set url to the path it was downloaded for.
337         Doesn't add a file to the DHT unless a hash was found for it
338         (but does add it anyway if forceDHT is True).
339         
340         @type file_path: L{twisted.python.filepath.FilePath}
341         @param file_path: the location of the file in the local cache
342         @type hash: L{Hash.HashObject}
343         @param hash: the original (expected) hash object containing also the
344             hash of the downloaded file
345         @type new_hash: C{boolean}
346         @param new_hash: whether the has was new to this peer, and so should
347             be added to the DHT
348         @type url: C{string}
349         @param url: the URI of the location of the file in the mirror
350             (optional, defaults to not adding the file to the mirror info)
351         @type forceDHT: C{boolean}
352         @param forceDHT: whether to force addition of the file to the DHT
353             even if the hash was not found in a mirror
354             (optional, defaults to False)
355         """
356         if url:
357             self.mirrors.updatedFile(url, file_path)
358         
359         if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
360             return self.store(hash)
361         return None
362             
363     def store(self, hash):
364         """Add a key/value pair for the file to the DHT.
365         
366         Sets the key and value from the hash information, and tries to add
367         it to the DHT.
368         """
369         key = hash.digest()
370         value = {'c': self.my_contact}
371         pieces = hash.pieceDigests()
372         
373         # Determine how to store any piece data
374         if len(pieces) <= 1:
375             pass
376         elif len(pieces) <= DHT_PIECES:
377             # Short enough to be stored with our peer contact info
378             value['t'] = {'t': ''.join(pieces)}
379         elif len(pieces) <= TORRENT_PIECES:
380             # Short enough to be stored in a separate key in the DHT
381             value['h'] = sha.new(''.join(pieces)).digest()
382         else:
383             # Too long, must be served up by our peer HTTP server
384             value['l'] = sha.new(''.join(pieces)).digest()
385
386         storeDefer = self.dht.storeValue(key, value)
387         storeDefer.addCallbacks(self.store_done, self.store_error,
388                                 callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
389         return storeDefer
390
391     def store_done(self, result, hash):
392         """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
393         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
394         pieces = hash.pieceDigests()
395         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
396             # Add the piece data key and value to the DHT
397             key = sha.new(''.join(pieces)).digest()
398             value = {'t': ''.join(pieces)}
399
400             storeDefer = self.dht.storeValue(key, value)
401             storeDefer.addCallbacks(self.store_torrent_done, self.store_error,
402                                     callbackArgs = (key, ), errbackArgs = (key, ))
403             return storeDefer
404         return result
405
406     def store_torrent_done(self, result, key):
407         """Adding the file to the DHT is complete, and so is the workflow."""
408         log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
409         return result
410
411     def store_error(self, err, key):
412         """Adding to the DHT failed."""
413         log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err))
414         return err
415