Fix minor bugs in the use of sha module and misspelled b2a_hex.
[quix0rs-apt-p2p.git] / apt_p2p / apt_p2p.py
1
2 """The main program code.
3
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
5     in the DHT
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
7     in the DHT
8 @var download_dir: the name of the directory to use for downloaded files
9
10 """
11
12 from binascii import b2a_hex
13 from urlparse import urlunparse
14 import os, re, sha
15
16 from twisted.internet import defer, reactor
17 from twisted.web2 import server, http, http_headers, static
18 from twisted.python import log, failure
19 from twisted.python.filepath import FilePath
20
21 from apt_p2p_conf import config
22 from PeerManager import PeerManager
23 from HTTPServer import TopLevel
24 from MirrorManager import MirrorManager
25 from CacheManager import CacheManager
26 from Hash import HashObject
27 from db import DB
28 from util import findMyIPAddr, compact
29
30 DHT_PIECES = 4
31 TORRENT_PIECES = 70
32
33 download_dir = 'cache'
34
35 class AptP2P:
36     """The main code object that does all of the work.
37     
38     Contains all of the sub-components that do all the low-level work, and
39     coordinates communication between them.
40     
41     @type cache_dir: L{twisted.python.filepath.FilePath}
42     @ivar cache_dir: the directory to use for storing all files
43     @type db: L{db.DB}
44     @ivar db: the database to use for tracking files and hashes
45     @type dht: L{interfaces.IDHT}
46     @ivar dht: the DHT instance to use
47     @type http_server: L{HTTPServer.TopLevel}
48     @ivar http_server: the web server that will handle all requests from apt
49         and from other peers
50     @type peers: L{PeerManager.PeerManager}
51     @ivar peers: the manager of all downloads from mirrors and other peers
52     @type mirrors: L{MirrorManager.MirrorManager}
53     @ivar mirrors: the manager of downloaded information about mirrors which
54         can be queried to get hashes from file names
55     @type cache: L{CacheManager.CacheManager}
56     @ivar cache: the manager of all downloaded files
57     @type my_contact: C{string}
58     @ivar my_contact: the 6-byte compact peer representation of this peer's
59         download information (IP address and port)
60     """
61     
62     def __init__(self, dht):
63         """Initialize all the sub-components.
64         
65         @type dht: L{interfaces.IDHT}
66         @param dht: the DHT instance to use
67         """
68         log.msg('Initializing the main apt_p2p application')
69         self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
70         if not self.cache_dir.child(download_dir).exists():
71             self.cache_dir.child(download_dir).makedirs()
72         self.db = DB(self.cache_dir.child('apt-p2p.db'))
73         self.dht = dht
74         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
75         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
76         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
77         self.getHTTPFactory = self.http_server.getHTTPFactory
78         self.peers = PeerManager()
79         self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
80         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
81         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
82         self.my_contact = None
83
84     #{ DHT maintenance
85     def joinComplete(self, result):
86         """Complete the DHT join process and determine our download information.
87         
88         Called by the DHT when the join has been completed with information
89         on the external IP address and port of this peer.
90         """
91         my_addr = findMyIPAddr(result,
92                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
93                                config.getboolean('DEFAULT', 'LOCAL_OK'))
94         if not my_addr:
95             raise RuntimeError, "IP address for this machine could not be found"
96         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
97         self.cache.scanDirectories()
98         reactor.callLater(60, self.refreshFiles)
99
100     def joinError(self, failure):
101         """Joining the DHT has failed."""
102         log.msg("joining DHT failed miserably")
103         log.err(failure)
104         raise RuntimeError, "IP address for this machine could not be found"
105     
106     def refreshFiles(self):
107         """Refresh any files in the DHT that are about to expire."""
108         expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
109         hashes = self.db.expiredHashes(expireAfter)
110         if len(hashes.keys()) > 0:
111             log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
112         self._refreshFiles(None, hashes)
113         
114     def _refreshFiles(self, result, hashes):
115         if result is not None:
116             log.msg('Storage resulted in: %r' % result)
117
118         if hashes:
119             raw_hash = hashes.keys()[0]
120             self.db.refreshHash(raw_hash)
121             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
122             del hashes[raw_hash]
123             storeDefer = self.store(hash)
124             storeDefer.addBoth(self._refreshFiles, hashes)
125         else:
126             reactor.callLater(60, self.refreshFiles)
127
128     #{ Main workflow
129     def check_freshness(self, req, url, modtime, resp):
130         """Send a HEAD to the mirror to check if the response from the cache is still valid.
131         
132         @type req: L{twisted.web2.http.Request}
133         @param req: the initial request sent to the HTTP server by apt
134         @param url: the URI of the actual mirror request
135         @type modtime: C{int}
136         @param modtime: the modified time of the cached file (seconds since epoch)
137         @type resp: L{twisted.web2.http.Response}
138         @param resp: the response from the cache to be sent to apt
139         @rtype: L{twisted.internet.defer.Deferred}
140         @return: a deferred that will be called back with the correct response
141         """
142         log.msg('Checking if %s is still fresh' % url)
143         d = self.peers.get('', url, method = "HEAD", modtime = modtime)
144         d.addCallback(self.check_freshness_done, req, url, resp)
145         return d
146     
147     def check_freshness_done(self, resp, req, url, orig_resp):
148         """Process the returned response from the mirror.
149         
150         @type resp: L{twisted.web2.http.Response}
151         @param resp: the response from the mirror to the HEAD request
152         @type req: L{twisted.web2.http.Request}
153         @param req: the initial request sent to the HTTP server by apt
154         @param url: the URI of the actual mirror request
155         @type orig_resp: L{twisted.web2.http.Response}
156         @param orig_resp: the response from the cache to be sent to apt
157         """
158         if resp.code == 304:
159             log.msg('Still fresh, returning: %s' % url)
160             return orig_resp
161         else:
162             log.msg('Stale, need to redownload: %s' % url)
163             return self.get_resp(req, url)
164     
165     def get_resp(self, req, url):
166         """Lookup a hash for the file in the local mirror info.
167         
168         Starts the process of getting a response to an uncached apt request.
169         
170         @type req: L{twisted.web2.http.Request}
171         @param req: the initial request sent to the HTTP server by apt
172         @param url: the URI of the actual mirror request
173         @rtype: L{twisted.internet.defer.Deferred}
174         @return: a deferred that will be called back with the response
175         """
176         d = defer.Deferred()
177         
178         log.msg('Trying to find hash for %s' % url)
179         findDefer = self.mirrors.findHash(url)
180         
181         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
182                                callbackArgs=(req, url, d), errbackArgs=(req, url, d))
183         findDefer.addErrback(log.err)
184         return d
185     
186     def findHash_error(self, failure, req, url, d):
187         """Process the error in hash lookup by returning an empty L{HashObject}."""
188         log.err(failure)
189         self.findHash_done(HashObject(), req, url, d)
190         
191     def findHash_done(self, hash, req, url, d):
192         """Use the returned hash to lookup  the file in the cache.
193         
194         If the hash was not found, the workflow skips down to download from
195         the mirror (L{lookupHash_done}).
196         
197         @type hash: L{Hash.HashObject}
198         @param hash: the hash object containing the expected hash for the file
199         """
200         if hash.expected() is None:
201             log.msg('Hash for %s was not found' % url)
202             self.lookupHash_done([], hash, url, d)
203         else:
204             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
205             
206             # Lookup hash in cache
207             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
208             self.getCachedFile(hash, req, url, d, locations)
209
210     def getCachedFile(self, hash, req, url, d, locations):
211         """Try to return the file from the cache, otherwise move on to a DHT lookup.
212         
213         @type locations: C{list} of C{dictionary}
214         @param locations: the files in the cache that match the hash,
215             the dictionary contains a key 'path' whose value is a
216             L{twisted.python.filepath.FilePath} object for the file.
217         """
218         if not locations:
219             log.msg('Failed to return file from cache: %s' % url)
220             self.lookupHash(hash, url, d)
221             return
222         
223         # Get the first possible location from the list
224         file = locations.pop(0)['path']
225         log.msg('Returning cached file: %s' % file.path)
226         
227         # Get it's response
228         resp = static.File(file.path).renderHTTP(req)
229         if isinstance(resp, defer.Deferred):
230             resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
231         else:
232             self._getCachedFile(resp, hash, req, url, d, locations)
233         
234     def _getCachedFile(self, resp, hash, req, url, d, locations):
235         """Check the returned response to be sure it is valid."""
236         if isinstance(resp, failure.Failure):
237             log.msg('Got error trying to get cached file')
238             log.err()
239             # Try the next possible location
240             self.getCachedFile(hash, req, url, d, locations)
241             return
242             
243         log.msg('Cached response: %r' % resp)
244         
245         if resp.code >= 200 and resp.code < 400:
246             d.callback(resp)
247         else:
248             # Try the next possible location
249             self.getCachedFile(hash, req, url, d, locations)
250
251     def lookupHash(self, hash, url, d):
252         """Lookup the hash in the DHT."""
253         log.msg('Looking up hash in DHT for file: %s' % url)
254         key = hash.expected()
255         lookupDefer = self.dht.getValue(key)
256         lookupDefer.addCallback(self.lookupHash_done, hash, url, d)
257
258     def lookupHash_done(self, values, hash, url, d):
259         """Start the download of the file.
260         
261         The download will be from peers if the DHT lookup succeeded, or
262         from the mirror otherwise.
263         
264         @type values: C{list} of C{dictionary}
265         @param values: the returned values from the DHT containing peer
266             download information
267         """
268         if not values:
269             log.msg('Peers for %s were not found' % url)
270             getDefer = self.peers.get(hash, url)
271             getDefer.addCallback(self.cache.save_file, hash, url)
272             getDefer.addErrback(self.cache.save_error, url)
273             getDefer.addCallbacks(d.callback, d.errback)
274         else:
275             log.msg('Found peers for %s: %r' % (url, values))
276             # Download from the found peers
277             getDefer = self.peers.get(hash, url, values)
278             getDefer.addCallback(self.check_response, hash, url)
279             getDefer.addCallback(self.cache.save_file, hash, url)
280             getDefer.addErrback(self.cache.save_error, url)
281             getDefer.addCallbacks(d.callback, d.errback)
282             
283     def check_response(self, response, hash, url):
284         """Check the response from peers, and download from the mirror if it is not."""
285         if response.code < 200 or response.code >= 300:
286             log.msg('Download from peers failed, going to direct download: %s' % url)
287             getDefer = self.peers.get(hash, url)
288             return getDefer
289         return response
290         
291     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
292         """Add a newly cached file to the mirror info and/or the DHT.
293         
294         If the file was downloaded, set url to the path it was downloaded for.
295         Doesn't add a file to the DHT unless a hash was found for it
296         (but does add it anyway if forceDHT is True).
297         
298         @type file_path: L{twisted.python.filepath.FilePath}
299         @param file_path: the location of the file in the local cache
300         @type hash: L{Hash.HashObject}
301         @param hash: the original (expected) hash object containing also the
302             hash of the downloaded file
303         @type new_hash: C{boolean}
304         @param new_hash: whether the has was new to this peer, and so should
305             be added to the DHT
306         @type url: C{string}
307         @param url: the URI of the location of the file in the mirror
308             (optional, defaults to not adding the file to the mirror info)
309         @type forceDHT: C{boolean}
310         @param forceDHT: whether to force addition of the file to the DHT
311             even if the hash was not found in a mirror
312             (optional, defaults to False)
313         """
314         if url:
315             self.mirrors.updatedFile(url, file_path)
316         
317         if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
318             return self.store(hash)
319         return None
320             
321     def store(self, hash):
322         """Add a key/value pair for the file to the DHT.
323         
324         Sets the key and value from the hash information, and tries to add
325         it to the DHT.
326         """
327         key = hash.digest()
328         value = {'c': self.my_contact}
329         pieces = hash.pieceDigests()
330         
331         # Determine how to store any piece data
332         if len(pieces) <= 1:
333             pass
334         elif len(pieces) <= DHT_PIECES:
335             # Short enough to be stored with our peer contact info
336             value['t'] = {'t': ''.join(pieces)}
337         elif len(pieces) <= TORRENT_PIECES:
338             # Short enough to be stored in a separate key in the DHT
339             value['h'] = sha.new(''.join(pieces)).digest()
340         else:
341             # Too long, must be served up by our peer HTTP server
342             value['l'] = sha.new(''.join(pieces)).digest()
343
344         storeDefer = self.dht.storeValue(key, value)
345         storeDefer.addCallback(self.store_done, hash)
346         return storeDefer
347
348     def store_done(self, result, hash):
349         """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
350         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
351         pieces = hash.pieceDigests()
352         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
353             # Add the piece data key and value to the DHT
354             key = sha.new(''.join(pieces)).digest()
355             value = {'t': ''.join(pieces)}
356
357             storeDefer = self.dht.storeValue(key, value)
358             storeDefer.addCallback(self.store_torrent_done, key)
359             return storeDefer
360         return result
361
362     def store_torrent_done(self, result, key):
363         """Adding the file to the DHT is complete, and so is the workflow."""
364         log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
365         return result
366