Make the upload limit a config option.
[quix0rs-apt-p2p.git] / apt_p2p / apt_p2p.py
1
2 """The main program code.
3
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
5     in the DHT
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
7     in the DHT
8 @var download_dir: the name of the directory to use for downloaded files
9
10 """
11
12 from binascii import b2a_hex
13 from urlparse import urlunparse
14 import os, re, sha
15
16 from twisted.internet import defer, reactor
17 from twisted.web2 import server, http, http_headers, static
18 from twisted.python import log, failure
19 from twisted.python.filepath import FilePath
20
21 from interfaces import IDHT, IDHTStats
22 from apt_p2p_conf import config
23 from PeerManager import PeerManager
24 from HTTPServer import TopLevel
25 from MirrorManager import MirrorManager
26 from CacheManager import CacheManager
27 from Hash import HashObject
28 from db import DB
29 from util import findMyIPAddr, compact
30
31 DHT_PIECES = 4
32 TORRENT_PIECES = 70
33
34 download_dir = 'cache'
35
36 class AptP2P:
37     """The main code object that does all of the work.
38     
39     Contains all of the sub-components that do all the low-level work, and
40     coordinates communication between them.
41     
42     @type dhtClass: L{interfaces.IDHT}
43     @ivar dhtClass: the DHT class to use
44     @type cache_dir: L{twisted.python.filepath.FilePath}
45     @ivar cache_dir: the directory to use for storing all files
46     @type db: L{db.DB}
47     @ivar db: the database to use for tracking files and hashes
48     @type dht: L{interfaces.IDHT}
49     @ivar dht: the DHT instance
50     @type http_server: L{HTTPServer.TopLevel}
51     @ivar http_server: the web server that will handle all requests from apt
52         and from other peers
53     @type peers: L{PeerManager.PeerManager}
54     @ivar peers: the manager of all downloads from mirrors and other peers
55     @type mirrors: L{MirrorManager.MirrorManager}
56     @ivar mirrors: the manager of downloaded information about mirrors which
57         can be queried to get hashes from file names
58     @type cache: L{CacheManager.CacheManager}
59     @ivar cache: the manager of all downloaded files
60     @type my_contact: C{string}
61     @ivar my_contact: the 6-byte compact peer representation of this peer's
62         download information (IP address and port)
63     """
64     
65     def __init__(self, dhtClass):
66         """Initialize all the sub-components.
67         
68         @type dht: L{interfaces.IDHT}
69         @param dht: the DHT class to use
70         """
71         log.msg('Initializing the main apt_p2p application')
72         self.dhtClass = dhtClass
73         self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
74         if not self.cache_dir.child(download_dir).exists():
75             self.cache_dir.child(download_dir).makedirs()
76         self.db = DB(self.cache_dir.child('apt-p2p.db'))
77         self.dht = dhtClass()
78         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
79         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
80         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self,
81                                     config.getint('DEFAULT', 'UPLOAD_LIMIT'))
82         self.getHTTPFactory = self.http_server.getHTTPFactory
83         self.peers = PeerManager()
84         self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
85         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
86         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
87         self.my_contact = None
88
89     #{ DHT maintenance
90     def joinComplete(self, result):
91         """Complete the DHT join process and determine our download information.
92         
93         Called by the DHT when the join has been completed with information
94         on the external IP address and port of this peer.
95         """
96         my_addr = findMyIPAddr(result,
97                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
98                                config.getboolean('DEFAULT', 'LOCAL_OK'))
99         if not my_addr:
100             raise RuntimeError, "IP address for this machine could not be found"
101         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
102         self.cache.scanDirectories()
103         reactor.callLater(60, self.refreshFiles)
104
105     def joinError(self, failure):
106         """Joining the DHT has failed."""
107         log.msg("joining DHT failed miserably")
108         log.err(failure)
109         raise RuntimeError, "IP address for this machine could not be found"
110     
111     def refreshFiles(self):
112         """Refresh any files in the DHT that are about to expire."""
113         expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
114         hashes = self.db.expiredHashes(expireAfter)
115         if len(hashes.keys()) > 0:
116             log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
117         self._refreshFiles(None, hashes)
118         
119     def _refreshFiles(self, result, hashes):
120         if result is not None:
121             log.msg('Storage resulted in: %r' % result)
122
123         if hashes:
124             raw_hash = hashes.keys()[0]
125             self.db.refreshHash(raw_hash)
126             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
127             del hashes[raw_hash]
128             storeDefer = self.store(hash)
129             storeDefer.addBoth(self._refreshFiles, hashes)
130         else:
131             reactor.callLater(60, self.refreshFiles)
132     
133     def getStats(self):
134         """Retrieve and format the statistics for the program.
135         
136         @rtype: C{string}
137         @return: the formatted HTML page containing the statistics
138         """
139         out = '<html><body>\n\n'
140         if IDHTStats.implementedBy(self.dhtClass):
141             out += self.dht.getStats()
142         out += '\n</body></html>\n'
143         return out
144
145     #{ Main workflow
146     def check_freshness(self, req, url, modtime, resp):
147         """Send a HEAD to the mirror to check if the response from the cache is still valid.
148         
149         @type req: L{twisted.web2.http.Request}
150         @param req: the initial request sent to the HTTP server by apt
151         @param url: the URI of the actual mirror request
152         @type modtime: C{int}
153         @param modtime: the modified time of the cached file (seconds since epoch)
154         @type resp: L{twisted.web2.http.Response}
155         @param resp: the response from the cache to be sent to apt
156         @rtype: L{twisted.internet.defer.Deferred}
157         @return: a deferred that will be called back with the correct response
158         """
159         log.msg('Checking if %s is still fresh' % url)
160         d = self.peers.get('', url, method = "HEAD", modtime = modtime)
161         d.addCallback(self.check_freshness_done, req, url, resp)
162         return d
163     
164     def check_freshness_done(self, resp, req, url, orig_resp):
165         """Process the returned response from the mirror.
166         
167         @type resp: L{twisted.web2.http.Response}
168         @param resp: the response from the mirror to the HEAD request
169         @type req: L{twisted.web2.http.Request}
170         @param req: the initial request sent to the HTTP server by apt
171         @param url: the URI of the actual mirror request
172         @type orig_resp: L{twisted.web2.http.Response}
173         @param orig_resp: the response from the cache to be sent to apt
174         """
175         if resp.code == 304:
176             log.msg('Still fresh, returning: %s' % url)
177             return orig_resp
178         else:
179             log.msg('Stale, need to redownload: %s' % url)
180             return self.get_resp(req, url)
181     
182     def get_resp(self, req, url):
183         """Lookup a hash for the file in the local mirror info.
184         
185         Starts the process of getting a response to an uncached apt request.
186         
187         @type req: L{twisted.web2.http.Request}
188         @param req: the initial request sent to the HTTP server by apt
189         @param url: the URI of the actual mirror request
190         @rtype: L{twisted.internet.defer.Deferred}
191         @return: a deferred that will be called back with the response
192         """
193         d = defer.Deferred()
194         
195         log.msg('Trying to find hash for %s' % url)
196         findDefer = self.mirrors.findHash(url)
197         
198         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
199                                callbackArgs=(req, url, d), errbackArgs=(req, url, d))
200         findDefer.addErrback(log.err)
201         return d
202     
203     def findHash_error(self, failure, req, url, d):
204         """Process the error in hash lookup by returning an empty L{HashObject}."""
205         log.err(failure)
206         self.findHash_done(HashObject(), req, url, d)
207         
208     def findHash_done(self, hash, req, url, d):
209         """Use the returned hash to lookup  the file in the cache.
210         
211         If the hash was not found, the workflow skips down to download from
212         the mirror (L{lookupHash_done}).
213         
214         @type hash: L{Hash.HashObject}
215         @param hash: the hash object containing the expected hash for the file
216         """
217         if hash.expected() is None:
218             log.msg('Hash for %s was not found' % url)
219             self.lookupHash_done([], hash, url, d)
220         else:
221             log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
222             
223             # Lookup hash in cache
224             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
225             self.getCachedFile(hash, req, url, d, locations)
226
227     def getCachedFile(self, hash, req, url, d, locations):
228         """Try to return the file from the cache, otherwise move on to a DHT lookup.
229         
230         @type locations: C{list} of C{dictionary}
231         @param locations: the files in the cache that match the hash,
232             the dictionary contains a key 'path' whose value is a
233             L{twisted.python.filepath.FilePath} object for the file.
234         """
235         if not locations:
236             log.msg('Failed to return file from cache: %s' % url)
237             self.lookupHash(hash, url, d)
238             return
239         
240         # Get the first possible location from the list
241         file = locations.pop(0)['path']
242         log.msg('Returning cached file: %s' % file.path)
243         
244         # Get it's response
245         resp = static.File(file.path).renderHTTP(req)
246         if isinstance(resp, defer.Deferred):
247             resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
248         else:
249             self._getCachedFile(resp, hash, req, url, d, locations)
250         
251     def _getCachedFile(self, resp, hash, req, url, d, locations):
252         """Check the returned response to be sure it is valid."""
253         if isinstance(resp, failure.Failure):
254             log.msg('Got error trying to get cached file')
255             log.err()
256             # Try the next possible location
257             self.getCachedFile(hash, req, url, d, locations)
258             return
259             
260         log.msg('Cached response: %r' % resp)
261         
262         if resp.code >= 200 and resp.code < 400:
263             d.callback(resp)
264         else:
265             # Try the next possible location
266             self.getCachedFile(hash, req, url, d, locations)
267
268     def lookupHash(self, hash, url, d):
269         """Lookup the hash in the DHT."""
270         log.msg('Looking up hash in DHT for file: %s' % url)
271         key = hash.expected()
272         lookupDefer = self.dht.getValue(key)
273         lookupDefer.addCallback(self.lookupHash_done, hash, url, d)
274
275     def lookupHash_done(self, values, hash, url, d):
276         """Start the download of the file.
277         
278         The download will be from peers if the DHT lookup succeeded, or
279         from the mirror otherwise.
280         
281         @type values: C{list} of C{dictionary}
282         @param values: the returned values from the DHT containing peer
283             download information
284         """
285         if not values:
286             log.msg('Peers for %s were not found' % url)
287             getDefer = self.peers.get(hash, url)
288             getDefer.addCallback(self.cache.save_file, hash, url)
289             getDefer.addErrback(self.cache.save_error, url)
290             getDefer.addCallbacks(d.callback, d.errback)
291         else:
292             log.msg('Found peers for %s: %r' % (url, values))
293             # Download from the found peers
294             getDefer = self.peers.get(hash, url, values)
295             getDefer.addCallback(self.check_response, hash, url)
296             getDefer.addCallback(self.cache.save_file, hash, url)
297             getDefer.addErrback(self.cache.save_error, url)
298             getDefer.addCallbacks(d.callback, d.errback)
299             
300     def check_response(self, response, hash, url):
301         """Check the response from peers, and download from the mirror if it is not."""
302         if response.code < 200 or response.code >= 300:
303             log.msg('Download from peers failed, going to direct download: %s' % url)
304             getDefer = self.peers.get(hash, url)
305             return getDefer
306         return response
307         
308     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
309         """Add a newly cached file to the mirror info and/or the DHT.
310         
311         If the file was downloaded, set url to the path it was downloaded for.
312         Doesn't add a file to the DHT unless a hash was found for it
313         (but does add it anyway if forceDHT is True).
314         
315         @type file_path: L{twisted.python.filepath.FilePath}
316         @param file_path: the location of the file in the local cache
317         @type hash: L{Hash.HashObject}
318         @param hash: the original (expected) hash object containing also the
319             hash of the downloaded file
320         @type new_hash: C{boolean}
321         @param new_hash: whether the has was new to this peer, and so should
322             be added to the DHT
323         @type url: C{string}
324         @param url: the URI of the location of the file in the mirror
325             (optional, defaults to not adding the file to the mirror info)
326         @type forceDHT: C{boolean}
327         @param forceDHT: whether to force addition of the file to the DHT
328             even if the hash was not found in a mirror
329             (optional, defaults to False)
330         """
331         if url:
332             self.mirrors.updatedFile(url, file_path)
333         
334         if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
335             return self.store(hash)
336         return None
337             
338     def store(self, hash):
339         """Add a key/value pair for the file to the DHT.
340         
341         Sets the key and value from the hash information, and tries to add
342         it to the DHT.
343         """
344         key = hash.digest()
345         value = {'c': self.my_contact}
346         pieces = hash.pieceDigests()
347         
348         # Determine how to store any piece data
349         if len(pieces) <= 1:
350             pass
351         elif len(pieces) <= DHT_PIECES:
352             # Short enough to be stored with our peer contact info
353             value['t'] = {'t': ''.join(pieces)}
354         elif len(pieces) <= TORRENT_PIECES:
355             # Short enough to be stored in a separate key in the DHT
356             value['h'] = sha.new(''.join(pieces)).digest()
357         else:
358             # Too long, must be served up by our peer HTTP server
359             value['l'] = sha.new(''.join(pieces)).digest()
360
361         storeDefer = self.dht.storeValue(key, value)
362         storeDefer.addCallback(self.store_done, hash)
363         return storeDefer
364
365     def store_done(self, result, hash):
366         """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
367         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
368         pieces = hash.pieceDigests()
369         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
370             # Add the piece data key and value to the DHT
371             key = sha.new(''.join(pieces)).digest()
372             value = {'t': ''.join(pieces)}
373
374             storeDefer = self.dht.storeValue(key, value)
375             storeDefer.addCallback(self.store_torrent_done, key)
376             return storeDefer
377         return result
378
379     def store_torrent_done(self, result, key):
380         """Adding the file to the DHT is complete, and so is the workflow."""
381         log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))
382         return result
383