1f04a87da3b97feb1d9e03a0b0ebaf516a16f67b
[quix0rs-apt-p2p.git] / apt_p2p / apt_p2p.py
1
2 """The main program code.
3
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
5     in the DHT
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
7     in the DHT
8 @var download_dir: the name of the directory to use for downloaded files
9
10 """
11
12 from binascii import b2a_hex
13 from urlparse import urlunparse
14 from urllib import unquote
15 import os, re, sha
16
17 from twisted.internet import defer, reactor
18 from twisted.web2 import server, http, http_headers, static
19 from twisted.python import log, failure
20 from twisted.python.filepath import FilePath
21
22 from interfaces import IDHT, IDHTStats
23 from apt_p2p_conf import config
24 from PeerManager import PeerManager
25 from HTTPServer import TopLevel
26 from MirrorManager import MirrorManager
27 from CacheManager import CacheManager
28 from Hash import HashObject
29 from db import DB
30 from util import findMyIPAddr, compact
31
32 DHT_PIECES = 4
33 TORRENT_PIECES = 70
34
35 download_dir = 'cache'
36
37 class AptP2P:
38     """The main code object that does all of the work.
39     
40     Contains all of the sub-components that do all the low-level work, and
41     coordinates communication between them.
42     
43     @type dhtClass: L{interfaces.IDHT}
44     @ivar dhtClass: the DHT class to use
45     @type cache_dir: L{twisted.python.filepath.FilePath}
46     @ivar cache_dir: the directory to use for storing all files
47     @type db: L{db.DB}
48     @ivar db: the database to use for tracking files and hashes
49     @type dht: L{interfaces.IDHT}
50     @ivar dht: the DHT instance
51     @type http_server: L{HTTPServer.TopLevel}
52     @ivar http_server: the web server that will handle all requests from apt
53         and from other peers
54     @type peers: L{PeerManager.PeerManager}
55     @ivar peers: the manager of all downloads from mirrors and other peers
56     @type mirrors: L{MirrorManager.MirrorManager}
57     @ivar mirrors: the manager of downloaded information about mirrors which
58         can be queried to get hashes from file names
59     @type cache: L{CacheManager.CacheManager}
60     @ivar cache: the manager of all downloaded files
61     @type my_contact: C{string}
62     @ivar my_contact: the 6-byte compact peer representation of this peer's
63         download information (IP address and port)
64     """
65     
66     def __init__(self, dhtClass):
67         """Initialize all the sub-components.
68         
69         @type dhtClass: L{interfaces.IDHT}
70         @param dhtClass: the DHT class to use
71         """
72         log.msg('Initializing the main apt_p2p application')
73         self.dhtClass = dhtClass
74         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
75         if not self.cache_dir.child(download_dir).exists():
76             self.cache_dir.child(download_dir).makedirs()
77         self.db = DB(self.cache_dir.child('apt-p2p.db'))
78         self.dht = dhtClass()
79         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
80         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
81         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self,
82                                     config.getint('DEFAULT', 'UPLOAD_LIMIT'))
83         self.getHTTPFactory = self.http_server.getHTTPFactory
84         self.peers = PeerManager(self.cache_dir, self.dht)
85         self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
86         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
87         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
88         self.my_contact = None
89
90     #{ DHT maintenance
91     def joinComplete(self, result):
92         """Complete the DHT join process and determine our download information.
93         
94         Called by the DHT when the join has been completed with information
95         on the external IP address and port of this peer.
96         """
97         my_addr = findMyIPAddr(result,
98                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
99                                config.getboolean('DEFAULT', 'LOCAL_OK'))
100         if not my_addr:
101             raise RuntimeError, "IP address for this machine could not be found"
102         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
103         self.cache.scanDirectories()
104         reactor.callLater(60, self.refreshFiles)
105
106     def joinError(self, failure):
107         """Joining the DHT has failed."""
108         log.msg("joining DHT failed miserably")
109         log.err(failure)
110         raise RuntimeError, "IP address for this machine could not be found"
111     
112     def refreshFiles(self):
113         """Refresh any files in the DHT that are about to expire."""
114         expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
115         hashes = self.db.expiredHashes(expireAfter)
116         if len(hashes.keys()) > 0:
117             log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
118         self._refreshFiles(None, hashes)
119         
120     def _refreshFiles(self, result, hashes):
121         if result is not None:
122             log.msg('Storage resulted in: %r' % result)
123
124         if hashes:
125             raw_hash = hashes.keys()[0]
126             self.db.refreshHash(raw_hash)
127             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
128             del hashes[raw_hash]
129             storeDefer = self.store(hash)
130             storeDefer.addBoth(self._refreshFiles, hashes)
131         else:
132             reactor.callLater(60, self.refreshFiles)
133     
134     def getStats(self):
135         """Retrieve and format the statistics for the program.
136         
137         @rtype: C{string}
138         @return: the formatted HTML page containing the statistics
139         """
140         out = '<html><body>\n\n'
141         if IDHTStats.implementedBy(self.dhtClass):
142             out += self.dht.getStats()
143         out += '\n</body></html>\n'
144         return out
145
146     #{ Main workflow
147     def check_freshness(self, req, url, modtime, resp):
148         """Send a HEAD to the mirror to check if the response from the cache is still valid.
149         
150         @type req: L{twisted.web2.http.Request}
151         @param req: the initial request sent to the HTTP server by apt
152         @param url: the URI of the actual mirror request
153         @type modtime: C{int}
154         @param modtime: the modified time of the cached file (seconds since epoch)
155         @type resp: L{twisted.web2.http.Response}
156         @param resp: the response from the cache to be sent to apt
157         @rtype: L{twisted.internet.defer.Deferred}
158         @return: a deferred that will be called back with the correct response
159         """
160         log.msg('Checking if %s is still fresh' % url)
161         d = self.peers.get('', url, method = "HEAD", modtime = modtime)
162         d.addCallback(self.check_freshness_done, req, url, resp)
163         return d
164     
165     def check_freshness_done(self, resp, req, url, orig_resp):
166         """Process the returned response from the mirror.
167         
168         @type resp: L{twisted.web2.http.Response}
169         @param resp: the response from the mirror to the HEAD request
170         @type req: L{twisted.web2.http.Request}
171         @param req: the initial request sent to the HTTP server by apt
172         @param url: the URI of the actual mirror request
173         @type orig_resp: L{twisted.web2.http.Response}
174         @param orig_resp: the response from the cache to be sent to apt
175         """
176         if resp.code == 304:
177             log.msg('Still fresh, returning: %s' % url)
178             return orig_resp
179         else:
180             log.msg('Stale, need to redownload: %s' % url)
181             return self.get_resp(req, url)
182     
183     def get_resp(self, req, url):
184         """Lookup a hash for the file in the local mirror info.
185         
186         Starts the process of getting a response to an uncached apt request.
187         
188         @type req: L{twisted.web2.http.Request}
189         @param req: the initial request sent to the HTTP server by apt
190         @param url: the URI of the actual mirror request
191         @rtype: L{twisted.internet.defer.Deferred}
192         @return: a deferred that will be called back with the response
193         """
194         d = defer.Deferred()
195         
196         log.msg('Trying to find hash for %s' % url)
197         findDefer = self.mirrors.findHash(unquote(url))
198         
199         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
200                                callbackArgs=(req, url, d), errbackArgs=(req, url, d))
201         findDefer.addErrback(log.err)
202         return d
203     
204     def findHash_error(self, failure, req, url, d):
205         """Process the error in hash lookup by returning an empty L{HashObject}."""
206         log.err(failure)
207         self.findHash_done(HashObject(), req, url, d)
208         
209     def findHash_done(self, hash, req, url, d):
210         """Use the returned hash to lookup  the file in the cache.
211         
212         If the hash was not found, the workflow skips down to download from
213         the mirror (L{lookupHash_done}).
214         
215         @type hash: L{Hash.HashObject}
216         @param hash: the hash object containing the expected hash for the file
217         """
218         if hash.expected() is None:
219             log.msg('Hash for %s was not found' % url)
220             self.lookupHash_done([], hash, url, d)
221         else:
222             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
223             
224             # Lookup hash in cache
225             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
226             self.getCachedFile(hash, req, url, d, locations)
227
228     def getCachedFile(self, hash, req, url, d, locations):
229         """Try to return the file from the cache, otherwise move on to a DHT lookup.
230         
231         @type locations: C{list} of C{dictionary}
232         @param locations: the files in the cache that match the hash,
233             the dictionary contains a key 'path' whose value is a
234             L{twisted.python.filepath.FilePath} object for the file.
235         """
236         if not locations:
237             log.msg('Failed to return file from cache: %s' % url)
238             self.lookupHash(hash, url, d)
239             return
240         
241         # Get the first possible location from the list
242         file = locations.pop(0)['path']
243         log.msg('Returning cached file: %s' % file.path)
244         
245         # Get it's response
246         resp = static.File(file.path).renderHTTP(req)
247         if isinstance(resp, defer.Deferred):
248             resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
249         else:
250             self._getCachedFile(resp, hash, req, url, d, locations)
251         
252     def _getCachedFile(self, resp, hash, req, url, d, locations):
253         """Check the returned response to be sure it is valid."""
254         if isinstance(resp, failure.Failure):
255             log.msg('Got error trying to get cached file')
256             log.err()
257             # Try the next possible location
258             self.getCachedFile(hash, req, url, d, locations)
259             return
260             
261         log.msg('Cached response: %r' % resp)
262         
263         if resp.code >= 200 and resp.code < 400:
264             d.callback(resp)
265         else:
266             # Try the next possible location
267             self.getCachedFile(hash, req, url, d, locations)
268
269     def lookupHash(self, hash, url, d):
270         """Lookup the hash in the DHT."""
271         log.msg('Looking up hash in DHT for file: %s' % url)
272         key = hash.expected()
273         lookupDefer = self.dht.getValue(key)
274         lookupDefer.addCallback(self.lookupHash_done, hash, url, d)
275
276     def lookupHash_done(self, values, hash, url, d):
277         """Start the download of the file.
278         
279         The download will be from peers if the DHT lookup succeeded, or
280         from the mirror otherwise.
281         
282         @type values: C{list} of C{dictionary}
283         @param values: the returned values from the DHT containing peer
284             download information
285         """
286         if not values:
287             log.msg('Peers for %s were not found' % url)
288             getDefer = self.peers.get(hash, url)
289             getDefer.addCallback(self.cache.save_file, hash, url)
290             getDefer.addErrback(self.cache.save_error, url)
291             getDefer.addCallbacks(d.callback, d.errback)
292         else:
293             log.msg('Found peers for %s: %r' % (url, values))
294             # Download from the found peers
295             getDefer = self.peers.get(hash, url, values)
296             getDefer.addCallback(self.check_response, hash, url)
297             getDefer.addCallback(self.cache.save_file, hash, url)
298             getDefer.addErrback(self.cache.save_error, url)
299             getDefer.addCallbacks(d.callback, d.errback)
300             
301     def check_response(self, response, hash, url):
302         """Check the response from peers, and download from the mirror if it is not."""
303         if response.code < 200 or response.code >= 300:
304             log.msg('Download from peers failed, going to direct download: %s' % url)
305             getDefer = self.peers.get(hash, url)
306             return getDefer
307         return response
308         
309     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
310         """Add a newly cached file to the mirror info and/or the DHT.
311         
312         If the file was downloaded, set url to the path it was downloaded for.
313         Doesn't add a file to the DHT unless a hash was found for it
314         (but does add it anyway if forceDHT is True).
315         
316         @type file_path: L{twisted.python.filepath.FilePath}
317         @param file_path: the location of the file in the local cache
318         @type hash: L{Hash.HashObject}
319         @param hash: the original (expected) hash object containing also the
320             hash of the downloaded file
321         @type new_hash: C{boolean}
322         @param new_hash: whether the has was new to this peer, and so should
323             be added to the DHT
324         @type url: C{string}
325         @param url: the URI of the location of the file in the mirror
326             (optional, defaults to not adding the file to the mirror info)
327         @type forceDHT: C{boolean}
328         @param forceDHT: whether to force addition of the file to the DHT
329             even if the hash was not found in a mirror
330             (optional, defaults to False)
331         """
332         if url:
333             self.mirrors.updatedFile(url, file_path)
334         
335         if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
336             return self.store(hash)
337         return None
338             
339     def store(self, hash):
340         """Add a key/value pair for the file to the DHT.
341         
342         Sets the key and value from the hash information, and tries to add
343         it to the DHT.
344         """
345         key = hash.digest()
346         value = {'c': self.my_contact}
347         pieces = hash.pieceDigests()
348         
349         # Determine how to store any piece data
350         if len(pieces) <= 1:
351             pass
352         elif len(pieces) <= DHT_PIECES:
353             # Short enough to be stored with our peer contact info
354             value['t'] = {'t': ''.join(pieces)}
355         elif len(pieces) <= TORRENT_PIECES:
356             # Short enough to be stored in a separate key in the DHT
357             value['h'] = sha.new(''.join(pieces)).digest()
358         else:
359             # Too long, must be served up by our peer HTTP server
360             value['l'] = sha.new(''.join(pieces)).digest()
361
362         storeDefer = self.dht.storeValue(key, value)
363         storeDefer.addCallback(self.store_done, hash)
364         return storeDefer
365
366     def store_done(self, result, hash):
367         """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
368         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
369         pieces = hash.pieceDigests()
370         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
371             # Add the piece data key and value to the DHT
372             key = sha.new(''.join(pieces)).digest()
373             value = {'t': ''.join(pieces)}
374
375             storeDefer = self.dht.storeValue(key, value)
376             storeDefer.addCallback(self.store_torrent_done, key)
377             return storeDefer
378         return result
379
380     def store_torrent_done(self, result, key):
381         """Adding the file to the DHT is complete, and so is the workflow."""
382         log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
383         return result
384