4200f6299b78377593ce17cd6ad7b6973de7578e
[quix0rs-apt-p2p.git] / apt_p2p / apt_p2p.py
1
2 """The main program code.
3
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
5     in the DHT
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
7     in the DHT
8 @var download_dir: the name of the directory to use for downloaded files
9 @var peer_dir: the name of the directory to use for peer downloads
10 """
11
12 from binascii import b2a_hex
13 from urlparse import urlunparse
14 from urllib import unquote
15 import os, re, sha
16
17 from twisted.internet import defer, reactor
18 from twisted.web2 import server, http, http_headers, static
19 from twisted.python import log, failure
20 from twisted.python.filepath import FilePath
21
22 from interfaces import IDHT, IDHTStats
23 from apt_p2p_conf import config
24 from PeerManager import PeerManager
25 from HTTPServer import TopLevel
26 from MirrorManager import MirrorManager
27 from CacheManager import CacheManager
28 from Hash import HashObject
29 from db import DB
30 from stats import StatsLogger
31 from util import findMyIPAddr, compact
32
33 DHT_PIECES = 4
34 TORRENT_PIECES = 70
35
36 download_dir = 'cache'
37 peer_dir = 'peers'
38
39 class AptP2P:
40     """The main code object that does all of the work.
41     
42     Contains all of the sub-components that do all the low-level work, and
43     coordinates communication between them.
44     
45     @type dhtClass: L{interfaces.IDHT}
46     @ivar dhtClass: the DHT class to use
47     @type cache_dir: L{twisted.python.filepath.FilePath}
48     @ivar cache_dir: the directory to use for storing all files
49     @type db: L{db.DB}
50     @ivar db: the database to use for tracking files and hashes
51     @type dht: L{interfaces.IDHT}
52     @ivar dht: the DHT instance
53     @type stats: L{stats.StatsLogger}
54     @ivar stats: the statistics logger to record sent data to
55     @type http_server: L{HTTPServer.TopLevel}
56     @ivar http_server: the web server that will handle all requests from apt
57         and from other peers
58     @type peers: L{PeerManager.PeerManager}
59     @ivar peers: the manager of all downloads from mirrors and other peers
60     @type mirrors: L{MirrorManager.MirrorManager}
61     @ivar mirrors: the manager of downloaded information about mirrors which
62         can be queried to get hashes from file names
63     @type cache: L{CacheManager.CacheManager}
64     @ivar cache: the manager of all downloaded files
65     @type my_contact: C{string}
66     @ivar my_contact: the 6-byte compact peer representation of this peer's
67         download information (IP address and port)
68     """
69     
70     def __init__(self, dhtClass):
71         """Initialize all the sub-components.
72         
73         @type dhtClass: L{interfaces.IDHT}
74         @param dhtClass: the DHT class to use
75         """
76         log.msg('Initializing the main apt_p2p application')
77         self.dhtClass = dhtClass
78         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
79         if not self.cache_dir.child(download_dir).exists():
80             self.cache_dir.child(download_dir).makedirs()
81         if not self.cache_dir.child(peer_dir).exists():
82             self.cache_dir.child(peer_dir).makedirs()
83         self.db = DB(self.cache_dir.child('apt-p2p.db'))
84         self.dht = dhtClass()
85         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
86         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
87         self.stats = StatsLogger(self.db)
88         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
89         self.getHTTPFactory = self.http_server.getHTTPFactory
90         self.peers = PeerManager(self.cache_dir.child(peer_dir), self.dht, self.stats)
91         self.mirrors = MirrorManager(self.cache_dir)
92         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
93         self.my_contact = None
94         reactor.addSystemEventTrigger('before', 'shutdown', self.shutdown)
95
96     #{ Maintenance
97     def shutdown(self):
98         self.stats.save()
99         self.db.close()
100         
101     def joinComplete(self, result):
102         """Complete the DHT join process and determine our download information.
103         
104         Called by the DHT when the join has been completed with information
105         on the external IP address and port of this peer.
106         """
107         my_addr = findMyIPAddr(result,
108                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
109                                config.getboolean('DEFAULT', 'LOCAL_OK'))
110         if not my_addr:
111             raise RuntimeError, "IP address for this machine could not be found"
112         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
113         self.cache.scanDirectories()
114         reactor.callLater(60, self.refreshFiles)
115
116     def joinError(self, failure):
117         """Joining the DHT has failed."""
118         log.msg("joining DHT failed miserably")
119         log.err(failure)
120         raise RuntimeError, "IP address for this machine could not be found"
121     
122     def refreshFiles(self):
123         """Refresh any files in the DHT that are about to expire."""
124         expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
125         hashes = self.db.expiredHashes(expireAfter)
126         if len(hashes.keys()) > 0:
127             log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
128         self._refreshFiles(None, hashes)
129         
130     def _refreshFiles(self, result, hashes):
131         if result is not None:
132             log.msg('Storage resulted in: %r' % result)
133
134         if hashes:
135             raw_hash = hashes.keys()[0]
136             self.db.refreshHash(raw_hash)
137             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
138             del hashes[raw_hash]
139             storeDefer = self.store(hash)
140             storeDefer.addBoth(self._refreshFiles, hashes)
141         else:
142             reactor.callLater(60, self.refreshFiles)
143     
144     def getStats(self):
145         """Retrieve and format the statistics for the program.
146         
147         @rtype: C{string}
148         @return: the formatted HTML page containing the statistics
149         """
150         out = '<html><body>\n\n'
151         out += self.stats.formatHTML(self.my_contact)
152         out += '\n\n'
153         if IDHTStats.implementedBy(self.dhtClass):
154             out += self.dht.getStats()
155         out += '\n</body></html>\n'
156         return out
157
158     #{ Main workflow
159     def check_freshness(self, req, url, modtime, resp):
160         """Send a HEAD to the mirror to check if the response from the cache is still valid.
161         
162         @type req: L{twisted.web2.http.Request}
163         @param req: the initial request sent to the HTTP server by apt
164         @param url: the URI of the actual mirror request
165         @type modtime: C{int}
166         @param modtime: the modified time of the cached file (seconds since epoch)
167         @type resp: L{twisted.web2.http.Response}
168         @param resp: the response from the cache to be sent to apt
169         @rtype: L{twisted.internet.defer.Deferred}
170         @return: a deferred that will be called back with the correct response
171         """
172         log.msg('Checking if %s is still fresh' % url)
173         d = self.peers.get('', url, method = "HEAD", modtime = modtime)
174         d.addCallbacks(self.check_freshness_done, self.check_freshness_error,
175                        callbackArgs = (req, url, resp), errbackArgs = (req, url))
176         return d
177     
178     def check_freshness_done(self, resp, req, url, orig_resp):
179         """Process the returned response from the mirror.
180         
181         @type resp: L{twisted.web2.http.Response}
182         @param resp: the response from the mirror to the HEAD request
183         @type req: L{twisted.web2.http.Request}
184         @param req: the initial request sent to the HTTP server by apt
185         @param url: the URI of the actual mirror request
186         @type orig_resp: L{twisted.web2.http.Response}
187         @param orig_resp: the response from the cache to be sent to apt
188         """
189         if resp.code == 304:
190             log.msg('Still fresh, returning: %s' % url)
191             return orig_resp
192         else:
193             log.msg('Stale, need to redownload: %s' % url)
194             return self.get_resp(req, url)
195     
196     def check_freshness_error(self, err, req, url):
197         """Mirror request failed, continue with download.
198         
199         @param err: the response from the mirror to the HEAD request
200         @type req: L{twisted.web2.http.Request}
201         @param req: the initial request sent to the HTTP server by apt
202         @param url: the URI of the actual mirror request
203         """
204         log.err(err)
205         return self.get_resp(req, url)
206     
207     def get_resp(self, req, url):
208         """Lookup a hash for the file in the local mirror info.
209         
210         Starts the process of getting a response to an uncached apt request.
211         
212         @type req: L{twisted.web2.http.Request}
213         @param req: the initial request sent to the HTTP server by apt
214         @param url: the URI of the actual mirror request
215         @rtype: L{twisted.internet.defer.Deferred}
216         @return: a deferred that will be called back with the response
217         """
218         d = defer.Deferred()
219         
220         log.msg('Trying to find hash for %s' % url)
221         findDefer = self.mirrors.findHash(unquote(url))
222         
223         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
224                                callbackArgs=(req, url, d), errbackArgs=(req, url, d))
225         findDefer.addErrback(log.err)
226         return d
227     
228     def findHash_error(self, failure, req, url, d):
229         """Process the error in hash lookup by returning an empty L{HashObject}."""
230         log.err(failure)
231         self.findHash_done(HashObject(), req, url, d)
232         
233     def findHash_done(self, hash, req, url, d):
234         """Use the returned hash to lookup  the file in the cache.
235         
236         If the hash was not found, the workflow skips down to download from
237         the mirror (L{lookupHash_done}).
238         
239         @type hash: L{Hash.HashObject}
240         @param hash: the hash object containing the expected hash for the file
241         """
242         if hash.expected() is None:
243             log.msg('Hash for %s was not found' % url)
244             self.lookupHash_done([], hash, url, d)
245         else:
246             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
247             
248             # Lookup hash in cache
249             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
250             self.getCachedFile(hash, req, url, d, locations)
251
252     def getCachedFile(self, hash, req, url, d, locations):
253         """Try to return the file from the cache, otherwise move on to a DHT lookup.
254         
255         @type locations: C{list} of C{dictionary}
256         @param locations: the files in the cache that match the hash,
257             the dictionary contains a key 'path' whose value is a
258             L{twisted.python.filepath.FilePath} object for the file.
259         """
260         if not locations:
261             log.msg('Failed to return file from cache: %s' % url)
262             self.lookupHash(hash, url, d)
263             return
264         
265         # Get the first possible location from the list
266         file = locations.pop(0)['path']
267         log.msg('Returning cached file: %s' % file.path)
268         
269         # Get it's response
270         resp = static.File(file.path).renderHTTP(req)
271         if isinstance(resp, defer.Deferred):
272             resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
273         else:
274             self._getCachedFile(resp, hash, req, url, d, locations)
275         
276     def _getCachedFile(self, resp, hash, req, url, d, locations):
277         """Check the returned response to be sure it is valid."""
278         if isinstance(resp, failure.Failure):
279             log.msg('Got error trying to get cached file')
280             log.err()
281             # Try the next possible location
282             self.getCachedFile(hash, req, url, d, locations)
283             return
284             
285         log.msg('Cached response: %r' % resp)
286         
287         if resp.code >= 200 and resp.code < 400:
288             d.callback(resp)
289         else:
290             # Try the next possible location
291             self.getCachedFile(hash, req, url, d, locations)
292
293     def lookupHash(self, hash, url, d):
294         """Lookup the hash in the DHT."""
295         log.msg('Looking up hash in DHT for file: %s' % url)
296         key = hash.expected()
297         lookupDefer = self.dht.getValue(key)
298         lookupDefer.addBoth(self.lookupHash_done, hash, url, d)
299
300     def lookupHash_done(self, values, hash, url, d):
301         """Start the download of the file.
302         
303         The download will be from peers if the DHT lookup succeeded, or
304         from the mirror otherwise.
305         
306         @type values: C{list} of C{dictionary}
307         @param values: the returned values from the DHT containing peer
308             download information
309         """
310         if not isinstance(values, list) or not values:
311             if not isinstance(values, list):
312                 log.msg('DHT lookup for %s failed with error %r' % (url, values))
313             else:
314                 log.msg('Peers for %s were not found' % url)
315             getDefer = self.peers.get(hash, url)
316             getDefer.addCallback(self.cache.save_file, hash, url)
317             getDefer.addErrback(self.cache.save_error, url)
318             getDefer.addCallbacks(d.callback, d.errback)
319         else:
320             log.msg('Found peers for %s: %r' % (url, values))
321             # Download from the found peers
322             getDefer = self.peers.get(hash, url, values)
323             getDefer.addCallback(self.check_response, hash, url)
324             getDefer.addCallback(self.cache.save_file, hash, url)
325             getDefer.addErrback(self.cache.save_error, url)
326             getDefer.addCallbacks(d.callback, d.errback)
327             
328     def check_response(self, response, hash, url):
329         """Check the response from peers, and download from the mirror if it is not."""
330         if response.code < 200 or response.code >= 300:
331             log.msg('Download from peers failed, going to direct download: %s' % url)
332             getDefer = self.peers.get(hash, url)
333             return getDefer
334         return response
335         
336     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
337         """Add a newly cached file to the mirror info and/or the DHT.
338         
339         If the file was downloaded, set url to the path it was downloaded for.
340         Doesn't add a file to the DHT unless a hash was found for it
341         (but does add it anyway if forceDHT is True).
342         
343         @type file_path: L{twisted.python.filepath.FilePath}
344         @param file_path: the location of the file in the local cache
345         @type hash: L{Hash.HashObject}
346         @param hash: the original (expected) hash object containing also the
347             hash of the downloaded file
348         @type new_hash: C{boolean}
349         @param new_hash: whether the has was new to this peer, and so should
350             be added to the DHT
351         @type url: C{string}
352         @param url: the URI of the location of the file in the mirror
353             (optional, defaults to not adding the file to the mirror info)
354         @type forceDHT: C{boolean}
355         @param forceDHT: whether to force addition of the file to the DHT
356             even if the hash was not found in a mirror
357             (optional, defaults to False)
358         """
359         if url:
360             self.mirrors.updatedFile(url, file_path)
361         
362         if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
363             return self.store(hash)
364         return None
365             
366     def store(self, hash):
367         """Add a key/value pair for the file to the DHT.
368         
369         Sets the key and value from the hash information, and tries to add
370         it to the DHT.
371         """
372         key = hash.digest()
373         value = {'c': self.my_contact}
374         pieces = hash.pieceDigests()
375         
376         # Determine how to store any piece data
377         if len(pieces) <= 1:
378             pass
379         elif len(pieces) <= DHT_PIECES:
380             # Short enough to be stored with our peer contact info
381             value['t'] = {'t': ''.join(pieces)}
382         elif len(pieces) <= TORRENT_PIECES:
383             # Short enough to be stored in a separate key in the DHT
384             value['h'] = sha.new(''.join(pieces)).digest()
385         else:
386             # Too long, must be served up by our peer HTTP server
387             value['l'] = sha.new(''.join(pieces)).digest()
388
389         storeDefer = self.dht.storeValue(key, value)
390         storeDefer.addCallbacks(self.store_done, self.store_error,
391                                 callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
392         return storeDefer
393
394     def store_done(self, result, hash):
395         """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
396         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
397         pieces = hash.pieceDigests()
398         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
399             # Add the piece data key and value to the DHT
400             key = sha.new(''.join(pieces)).digest()
401             value = {'t': ''.join(pieces)}
402
403             storeDefer = self.dht.storeValue(key, value)
404             storeDefer.addCallbacks(self.store_torrent_done, self.store_error,
405                                     callbackArgs = (key, ), errbackArgs = (key, ))
406             return storeDefer
407         return result
408
409     def store_torrent_done(self, result, key):
410         """Adding the file to the DHT is complete, and so is the workflow."""
411         log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
412         return result
413
414     def store_error(self, err, key):
415         """Adding to the DHT failed."""
416         log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err))
417         return err
418