Add statistics reporting to the main program (untested).
[quix0rs-apt-p2p.git] / apt_p2p / apt_p2p.py
1
2 """The main program code.
3
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
5     in the DHT
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
7     in the DHT
8 @var download_dir: the name of the directory to use for downloaded files
9
10 """
11
12 from binascii import b2a_hex
13 from urlparse import urlunparse
14 from urllib import unquote
15 import os, re, sha
16
17 from twisted.internet import defer, reactor
18 from twisted.web2 import server, http, http_headers, static
19 from twisted.python import log, failure
20 from twisted.python.filepath import FilePath
21
22 from interfaces import IDHT, IDHTStats
23 from apt_p2p_conf import config
24 from PeerManager import PeerManager
25 from HTTPServer import TopLevel
26 from MirrorManager import MirrorManager
27 from CacheManager import CacheManager
28 from Hash import HashObject
29 from db import DB
30 from stats import StatsLogger
31 from util import findMyIPAddr, compact
32
33 DHT_PIECES = 4
34 TORRENT_PIECES = 70
35
36 download_dir = 'cache'
37
38 class AptP2P:
39     """The main code object that does all of the work.
40     
41     Contains all of the sub-components that do all the low-level work, and
42     coordinates communication between them.
43     
44     @type dhtClass: L{interfaces.IDHT}
45     @ivar dhtClass: the DHT class to use
46     @type cache_dir: L{twisted.python.filepath.FilePath}
47     @ivar cache_dir: the directory to use for storing all files
48     @type db: L{db.DB}
49     @ivar db: the database to use for tracking files and hashes
50     @type dht: L{interfaces.IDHT}
51     @ivar dht: the DHT instance
52     @type stats: L{stats.StatsLogger}
53     @ivar stats: the statistics logger to record sent data to
54     @type http_server: L{HTTPServer.TopLevel}
55     @ivar http_server: the web server that will handle all requests from apt
56         and from other peers
57     @type peers: L{PeerManager.PeerManager}
58     @ivar peers: the manager of all downloads from mirrors and other peers
59     @type mirrors: L{MirrorManager.MirrorManager}
60     @ivar mirrors: the manager of downloaded information about mirrors which
61         can be queried to get hashes from file names
62     @type cache: L{CacheManager.CacheManager}
63     @ivar cache: the manager of all downloaded files
64     @type my_contact: C{string}
65     @ivar my_contact: the 6-byte compact peer representation of this peer's
66         download information (IP address and port)
67     """
68     
69     def __init__(self, dhtClass):
70         """Initialize all the sub-components.
71         
72         @type dhtClass: L{interfaces.IDHT}
73         @param dhtClass: the DHT class to use
74         """
75         log.msg('Initializing the main apt_p2p application')
76         self.dhtClass = dhtClass
77         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
78         if not self.cache_dir.child(download_dir).exists():
79             self.cache_dir.child(download_dir).makedirs()
80         self.db = DB(self.cache_dir.child('apt-p2p.db'))
81         self.dht = dhtClass()
82         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
83         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
84         self.stats = StatsLogger(self.db)
85         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
86         self.getHTTPFactory = self.http_server.getHTTPFactory
87         self.peers = PeerManager(self.cache_dir, self.dht, self.stats)
88         self.mirrors = MirrorManager(self.cache_dir)
89         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, self)
90         self.my_contact = None
91
92     #{ DHT maintenance
93     def joinComplete(self, result):
94         """Complete the DHT join process and determine our download information.
95         
96         Called by the DHT when the join has been completed with information
97         on the external IP address and port of this peer.
98         """
99         my_addr = findMyIPAddr(result,
100                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
101                                config.getboolean('DEFAULT', 'LOCAL_OK'))
102         if not my_addr:
103             raise RuntimeError, "IP address for this machine could not be found"
104         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
105         self.cache.scanDirectories()
106         reactor.callLater(60, self.refreshFiles)
107
108     def joinError(self, failure):
109         """Joining the DHT has failed."""
110         log.msg("joining DHT failed miserably")
111         log.err(failure)
112         raise RuntimeError, "IP address for this machine could not be found"
113     
114     def refreshFiles(self):
115         """Refresh any files in the DHT that are about to expire."""
116         expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
117         hashes = self.db.expiredHashes(expireAfter)
118         if len(hashes.keys()) > 0:
119             log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
120         self._refreshFiles(None, hashes)
121         
122     def _refreshFiles(self, result, hashes):
123         if result is not None:
124             log.msg('Storage resulted in: %r' % result)
125
126         if hashes:
127             raw_hash = hashes.keys()[0]
128             self.db.refreshHash(raw_hash)
129             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
130             del hashes[raw_hash]
131             storeDefer = self.store(hash)
132             storeDefer.addBoth(self._refreshFiles, hashes)
133         else:
134             reactor.callLater(60, self.refreshFiles)
135     
136     def getStats(self):
137         """Retrieve and format the statistics for the program.
138         
139         @rtype: C{string}
140         @return: the formatted HTML page containing the statistics
141         """
142         out = '<html><body>\n\n'
143         out += self.stats.formatHTML(self.my_contact)
144         out += '\n\n'
145         if IDHTStats.implementedBy(self.dhtClass):
146             out += self.dht.getStats()
147         out += '\n</body></html>\n'
148         return out
149
150     #{ Main workflow
151     def check_freshness(self, req, url, modtime, resp):
152         """Send a HEAD to the mirror to check if the response from the cache is still valid.
153         
154         @type req: L{twisted.web2.http.Request}
155         @param req: the initial request sent to the HTTP server by apt
156         @param url: the URI of the actual mirror request
157         @type modtime: C{int}
158         @param modtime: the modified time of the cached file (seconds since epoch)
159         @type resp: L{twisted.web2.http.Response}
160         @param resp: the response from the cache to be sent to apt
161         @rtype: L{twisted.internet.defer.Deferred}
162         @return: a deferred that will be called back with the correct response
163         """
164         log.msg('Checking if %s is still fresh' % url)
165         d = self.peers.get('', url, method = "HEAD", modtime = modtime)
166         d.addCallbacks(self.check_freshness_done, self.check_freshness_error,
167                        callbackArgs = (req, url, resp), errbackArgs = (req, url))
168         return d
169     
170     def check_freshness_done(self, resp, req, url, orig_resp):
171         """Process the returned response from the mirror.
172         
173         @type resp: L{twisted.web2.http.Response}
174         @param resp: the response from the mirror to the HEAD request
175         @type req: L{twisted.web2.http.Request}
176         @param req: the initial request sent to the HTTP server by apt
177         @param url: the URI of the actual mirror request
178         @type orig_resp: L{twisted.web2.http.Response}
179         @param orig_resp: the response from the cache to be sent to apt
180         """
181         if resp.code == 304:
182             log.msg('Still fresh, returning: %s' % url)
183             return orig_resp
184         else:
185             log.msg('Stale, need to redownload: %s' % url)
186             return self.get_resp(req, url)
187     
188     def check_freshness_error(self, err, req, url):
189         """Mirror request failed, continue with download.
190         
191         @param err: the response from the mirror to the HEAD request
192         @type req: L{twisted.web2.http.Request}
193         @param req: the initial request sent to the HTTP server by apt
194         @param url: the URI of the actual mirror request
195         """
196         log.err(err)
197         return self.get_resp(req, url)
198     
199     def get_resp(self, req, url):
200         """Lookup a hash for the file in the local mirror info.
201         
202         Starts the process of getting a response to an uncached apt request.
203         
204         @type req: L{twisted.web2.http.Request}
205         @param req: the initial request sent to the HTTP server by apt
206         @param url: the URI of the actual mirror request
207         @rtype: L{twisted.internet.defer.Deferred}
208         @return: a deferred that will be called back with the response
209         """
210         d = defer.Deferred()
211         
212         log.msg('Trying to find hash for %s' % url)
213         findDefer = self.mirrors.findHash(unquote(url))
214         
215         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
216                                callbackArgs=(req, url, d), errbackArgs=(req, url, d))
217         findDefer.addErrback(log.err)
218         return d
219     
220     def findHash_error(self, failure, req, url, d):
221         """Process the error in hash lookup by returning an empty L{HashObject}."""
222         log.err(failure)
223         self.findHash_done(HashObject(), req, url, d)
224         
225     def findHash_done(self, hash, req, url, d):
226         """Use the returned hash to lookup  the file in the cache.
227         
228         If the hash was not found, the workflow skips down to download from
229         the mirror (L{lookupHash_done}).
230         
231         @type hash: L{Hash.HashObject}
232         @param hash: the hash object containing the expected hash for the file
233         """
234         if hash.expected() is None:
235             log.msg('Hash for %s was not found' % url)
236             self.lookupHash_done([], hash, url, d)
237         else:
238             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
239             
240             # Lookup hash in cache
241             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
242             self.getCachedFile(hash, req, url, d, locations)
243
244     def getCachedFile(self, hash, req, url, d, locations):
245         """Try to return the file from the cache, otherwise move on to a DHT lookup.
246         
247         @type locations: C{list} of C{dictionary}
248         @param locations: the files in the cache that match the hash,
249             the dictionary contains a key 'path' whose value is a
250             L{twisted.python.filepath.FilePath} object for the file.
251         """
252         if not locations:
253             log.msg('Failed to return file from cache: %s' % url)
254             self.lookupHash(hash, url, d)
255             return
256         
257         # Get the first possible location from the list
258         file = locations.pop(0)['path']
259         log.msg('Returning cached file: %s' % file.path)
260         
261         # Get it's response
262         resp = static.File(file.path).renderHTTP(req)
263         if isinstance(resp, defer.Deferred):
264             resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
265         else:
266             self._getCachedFile(resp, hash, req, url, d, locations)
267         
268     def _getCachedFile(self, resp, hash, req, url, d, locations):
269         """Check the returned response to be sure it is valid."""
270         if isinstance(resp, failure.Failure):
271             log.msg('Got error trying to get cached file')
272             log.err()
273             # Try the next possible location
274             self.getCachedFile(hash, req, url, d, locations)
275             return
276             
277         log.msg('Cached response: %r' % resp)
278         
279         if resp.code >= 200 and resp.code < 400:
280             d.callback(resp)
281         else:
282             # Try the next possible location
283             self.getCachedFile(hash, req, url, d, locations)
284
285     def lookupHash(self, hash, url, d):
286         """Lookup the hash in the DHT."""
287         log.msg('Looking up hash in DHT for file: %s' % url)
288         key = hash.expected()
289         lookupDefer = self.dht.getValue(key)
290         lookupDefer.addBoth(self.lookupHash_done, hash, url, d)
291
292     def lookupHash_done(self, values, hash, url, d):
293         """Start the download of the file.
294         
295         The download will be from peers if the DHT lookup succeeded, or
296         from the mirror otherwise.
297         
298         @type values: C{list} of C{dictionary}
299         @param values: the returned values from the DHT containing peer
300             download information
301         """
302         if not isinstance(values, list) or not values:
303             if not isinstance(values, list):
304                 log.msg('DHT lookup for %s failed with error %r' % (url, values))
305             else:
306                 log.msg('Peers for %s were not found' % url)
307             getDefer = self.peers.get(hash, url)
308             getDefer.addCallback(self.cache.save_file, hash, url)
309             getDefer.addErrback(self.cache.save_error, url)
310             getDefer.addCallbacks(d.callback, d.errback)
311         else:
312             log.msg('Found peers for %s: %r' % (url, values))
313             # Download from the found peers
314             getDefer = self.peers.get(hash, url, values)
315             getDefer.addCallback(self.check_response, hash, url)
316             getDefer.addCallback(self.cache.save_file, hash, url)
317             getDefer.addErrback(self.cache.save_error, url)
318             getDefer.addCallbacks(d.callback, d.errback)
319             
320     def check_response(self, response, hash, url):
321         """Check the response from peers, and download from the mirror if it is not."""
322         if response.code < 200 or response.code >= 300:
323             log.msg('Download from peers failed, going to direct download: %s' % url)
324             getDefer = self.peers.get(hash, url)
325             return getDefer
326         return response
327         
328     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
329         """Add a newly cached file to the mirror info and/or the DHT.
330         
331         If the file was downloaded, set url to the path it was downloaded for.
332         Doesn't add a file to the DHT unless a hash was found for it
333         (but does add it anyway if forceDHT is True).
334         
335         @type file_path: L{twisted.python.filepath.FilePath}
336         @param file_path: the location of the file in the local cache
337         @type hash: L{Hash.HashObject}
338         @param hash: the original (expected) hash object containing also the
339             hash of the downloaded file
340         @type new_hash: C{boolean}
341         @param new_hash: whether the has was new to this peer, and so should
342             be added to the DHT
343         @type url: C{string}
344         @param url: the URI of the location of the file in the mirror
345             (optional, defaults to not adding the file to the mirror info)
346         @type forceDHT: C{boolean}
347         @param forceDHT: whether to force addition of the file to the DHT
348             even if the hash was not found in a mirror
349             (optional, defaults to False)
350         """
351         if url:
352             self.mirrors.updatedFile(url, file_path)
353         
354         if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
355             return self.store(hash)
356         return None
357             
358     def store(self, hash):
359         """Add a key/value pair for the file to the DHT.
360         
361         Sets the key and value from the hash information, and tries to add
362         it to the DHT.
363         """
364         key = hash.digest()
365         value = {'c': self.my_contact}
366         pieces = hash.pieceDigests()
367         
368         # Determine how to store any piece data
369         if len(pieces) <= 1:
370             pass
371         elif len(pieces) <= DHT_PIECES:
372             # Short enough to be stored with our peer contact info
373             value['t'] = {'t': ''.join(pieces)}
374         elif len(pieces) <= TORRENT_PIECES:
375             # Short enough to be stored in a separate key in the DHT
376             value['h'] = sha.new(''.join(pieces)).digest()
377         else:
378             # Too long, must be served up by our peer HTTP server
379             value['l'] = sha.new(''.join(pieces)).digest()
380
381         storeDefer = self.dht.storeValue(key, value)
382         storeDefer.addCallbacks(self.store_done, self.store_error,
383                                 callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
384         return storeDefer
385
386     def store_done(self, result, hash):
387         """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
388         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
389         pieces = hash.pieceDigests()
390         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
391             # Add the piece data key and value to the DHT
392             key = sha.new(''.join(pieces)).digest()
393             value = {'t': ''.join(pieces)}
394
395             storeDefer = self.dht.storeValue(key, value)
396             storeDefer.addCallbacks(self.store_torrent_done, self.store_error,
397                                     callbackArgs = (key, ), errbackArgs = (key, ))
398             return storeDefer
399         return result
400
401     def store_torrent_done(self, result, key):
402         """Adding the file to the DHT is complete, and so is the workflow."""
403         log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
404         return result
405
406     def store_error(self, err, key):
407         """Adding to the DHT failed."""
408         log.msg('An error occurred adding %s to the DHT: %r' % (b2a_hex(key), err))
409         return err
410