2 """The main program code.
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
8 @var download_dir: the name of the directory to use for downloaded files
12 from binascii import b2a_hex
13 from urlparse import urlunparse
14 from urllib import unquote
17 from twisted.internet import defer, reactor
18 from twisted.web2 import server, http, http_headers, static
19 from twisted.python import log, failure
20 from twisted.python.filepath import FilePath
22 from interfaces import IDHT, IDHTStats
23 from apt_p2p_conf import config
24 from PeerManager import PeerManager
25 from HTTPServer import TopLevel
26 from MirrorManager import MirrorManager
27 from CacheManager import CacheManager
28 from Hash import HashObject
30 from util import findMyIPAddr, compact
35 download_dir = 'cache'
38 """The main code object that does all of the work.
40 Contains all of the sub-components that do all the low-level work, and
41 coordinates communication between them.
43 @type dhtClass: L{interfaces.IDHT}
44 @ivar dhtClass: the DHT class to use
45 @type cache_dir: L{twisted.python.filepath.FilePath}
46 @ivar cache_dir: the directory to use for storing all files
48 @ivar db: the database to use for tracking files and hashes
49 @type dht: L{interfaces.IDHT}
50 @ivar dht: the DHT instance
51 @type http_server: L{HTTPServer.TopLevel}
52 @ivar http_server: the web server that will handle all requests from apt
54 @type peers: L{PeerManager.PeerManager}
55 @ivar peers: the manager of all downloads from mirrors and other peers
56 @type mirrors: L{MirrorManager.MirrorManager}
57 @ivar mirrors: the manager of downloaded information about mirrors which
58 can be queried to get hashes from file names
59 @type cache: L{CacheManager.CacheManager}
60 @ivar cache: the manager of all downloaded files
61 @type my_contact: C{string}
62 @ivar my_contact: the 6-byte compact peer representation of this peer's
63 download information (IP address and port)
66 def __init__(self, dhtClass):
67 """Initialize all the sub-components.
69 @type dhtClass: L{interfaces.IDHT}
70 @param dhtClass: the DHT class to use
72 log.msg('Initializing the main apt_p2p application')
73 self.dhtClass = dhtClass
74 self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
75 if not self.cache_dir.child(download_dir).exists():
76 self.cache_dir.child(download_dir).makedirs()
77 self.db = DB(self.cache_dir.child('apt-p2p.db'))
79 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
80 self.dht.join().addCallbacks(self.joinComplete, self.joinError)
81 self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self,
82 config.getint('DEFAULT', 'UPLOAD_LIMIT'))
83 self.getHTTPFactory = self.http_server.getHTTPFactory
84 self.peers = PeerManager(self.cache_dir, self.dht)
85 self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
86 other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
87 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
88 self.my_contact = None
91 def joinComplete(self, result):
92 """Complete the DHT join process and determine our download information.
94 Called by the DHT when the join has been completed with information
95 on the external IP address and port of this peer.
97 my_addr = findMyIPAddr(result,
98 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
99 config.getboolean('DEFAULT', 'LOCAL_OK'))
101 raise RuntimeError, "IP address for this machine could not be found"
102 self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
103 self.cache.scanDirectories()
104 reactor.callLater(60, self.refreshFiles)
106 def joinError(self, failure):
107 """Joining the DHT has failed."""
108 log.msg("joining DHT failed miserably")
110 raise RuntimeError, "IP address for this machine could not be found"
112 def refreshFiles(self):
113 """Refresh any files in the DHT that are about to expire."""
114 expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
115 hashes = self.db.expiredHashes(expireAfter)
116 if len(hashes.keys()) > 0:
117 log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
118 self._refreshFiles(None, hashes)
120 def _refreshFiles(self, result, hashes):
121 if result is not None:
122 log.msg('Storage resulted in: %r' % result)
125 raw_hash = hashes.keys()[0]
126 self.db.refreshHash(raw_hash)
127 hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
129 storeDefer = self.store(hash)
130 storeDefer.addBoth(self._refreshFiles, hashes)
132 reactor.callLater(60, self.refreshFiles)
135 """Retrieve and format the statistics for the program.
138 @return: the formatted HTML page containing the statistics
140 out = '<html><body>\n\n'
141 if IDHTStats.implementedBy(self.dhtClass):
142 out += self.dht.getStats()
143 out += '\n</body></html>\n'
147 def check_freshness(self, req, url, modtime, resp):
148 """Send a HEAD to the mirror to check if the response from the cache is still valid.
150 @type req: L{twisted.web2.http.Request}
151 @param req: the initial request sent to the HTTP server by apt
152 @param url: the URI of the actual mirror request
153 @type modtime: C{int}
154 @param modtime: the modified time of the cached file (seconds since epoch)
155 @type resp: L{twisted.web2.http.Response}
156 @param resp: the response from the cache to be sent to apt
157 @rtype: L{twisted.internet.defer.Deferred}
158 @return: a deferred that will be called back with the correct response
160 log.msg('Checking if %s is still fresh' % url)
161 d = self.peers.get('', url, method = "HEAD", modtime = modtime)
162 d.addCallback(self.check_freshness_done, req, url, resp)
165 def check_freshness_done(self, resp, req, url, orig_resp):
166 """Process the returned response from the mirror.
168 @type resp: L{twisted.web2.http.Response}
169 @param resp: the response from the mirror to the HEAD request
170 @type req: L{twisted.web2.http.Request}
171 @param req: the initial request sent to the HTTP server by apt
172 @param url: the URI of the actual mirror request
173 @type orig_resp: L{twisted.web2.http.Response}
174 @param orig_resp: the response from the cache to be sent to apt
177 log.msg('Still fresh, returning: %s' % url)
180 log.msg('Stale, need to redownload: %s' % url)
181 return self.get_resp(req, url)
183 def get_resp(self, req, url):
184 """Lookup a hash for the file in the local mirror info.
186 Starts the process of getting a response to an uncached apt request.
188 @type req: L{twisted.web2.http.Request}
189 @param req: the initial request sent to the HTTP server by apt
190 @param url: the URI of the actual mirror request
191 @rtype: L{twisted.internet.defer.Deferred}
192 @return: a deferred that will be called back with the response
196 log.msg('Trying to find hash for %s' % url)
197 findDefer = self.mirrors.findHash(unquote(url))
199 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
200 callbackArgs=(req, url, d), errbackArgs=(req, url, d))
201 findDefer.addErrback(log.err)
204 def findHash_error(self, failure, req, url, d):
205 """Process the error in hash lookup by returning an empty L{HashObject}."""
207 self.findHash_done(HashObject(), req, url, d)
209 def findHash_done(self, hash, req, url, d):
210 """Use the returned hash to lookup the file in the cache.
212 If the hash was not found, the workflow skips down to download from
213 the mirror (L{lookupHash_done}).
215 @type hash: L{Hash.HashObject}
216 @param hash: the hash object containing the expected hash for the file
218 if hash.expected() is None:
219 log.msg('Hash for %s was not found' % url)
220 self.lookupHash_done([], hash, url, d)
222 log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
224 # Lookup hash in cache
225 locations = self.db.lookupHash(hash.expected(), filesOnly = True)
226 self.getCachedFile(hash, req, url, d, locations)
228 def getCachedFile(self, hash, req, url, d, locations):
229 """Try to return the file from the cache, otherwise move on to a DHT lookup.
231 @type locations: C{list} of C{dictionary}
232 @param locations: the files in the cache that match the hash,
233 the dictionary contains a key 'path' whose value is a
234 L{twisted.python.filepath.FilePath} object for the file.
237 log.msg('Failed to return file from cache: %s' % url)
238 self.lookupHash(hash, url, d)
241 # Get the first possible location from the list
242 file = locations.pop(0)['path']
243 log.msg('Returning cached file: %s' % file.path)
246 resp = static.File(file.path).renderHTTP(req)
247 if isinstance(resp, defer.Deferred):
248 resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
250 self._getCachedFile(resp, hash, req, url, d, locations)
252 def _getCachedFile(self, resp, hash, req, url, d, locations):
253 """Check the returned response to be sure it is valid."""
254 if isinstance(resp, failure.Failure):
255 log.msg('Got error trying to get cached file')
257 # Try the next possible location
258 self.getCachedFile(hash, req, url, d, locations)
261 log.msg('Cached response: %r' % resp)
263 if resp.code >= 200 and resp.code < 400:
266 # Try the next possible location
267 self.getCachedFile(hash, req, url, d, locations)
269 def lookupHash(self, hash, url, d):
270 """Lookup the hash in the DHT."""
271 log.msg('Looking up hash in DHT for file: %s' % url)
272 key = hash.expected()
273 lookupDefer = self.dht.getValue(key)
274 lookupDefer.addCallback(self.lookupHash_done, hash, url, d)
276 def lookupHash_done(self, values, hash, url, d):
277 """Start the download of the file.
279 The download will be from peers if the DHT lookup succeeded, or
280 from the mirror otherwise.
282 @type values: C{list} of C{dictionary}
283 @param values: the returned values from the DHT containing peer
287 log.msg('Peers for %s were not found' % url)
288 getDefer = self.peers.get(hash, url)
289 getDefer.addCallback(self.cache.save_file, hash, url)
290 getDefer.addErrback(self.cache.save_error, url)
291 getDefer.addCallbacks(d.callback, d.errback)
293 log.msg('Found peers for %s: %r' % (url, values))
294 # Download from the found peers
295 getDefer = self.peers.get(hash, url, values)
296 getDefer.addCallback(self.check_response, hash, url)
297 getDefer.addCallback(self.cache.save_file, hash, url)
298 getDefer.addErrback(self.cache.save_error, url)
299 getDefer.addCallbacks(d.callback, d.errback)
301 def check_response(self, response, hash, url):
302 """Check the response from peers, and download from the mirror if it is not."""
303 if response.code < 200 or response.code >= 300:
304 log.msg('Download from peers failed, going to direct download: %s' % url)
305 getDefer = self.peers.get(hash, url)
309 def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
310 """Add a newly cached file to the mirror info and/or the DHT.
312 If the file was downloaded, set url to the path it was downloaded for.
313 Doesn't add a file to the DHT unless a hash was found for it
314 (but does add it anyway if forceDHT is True).
316 @type file_path: L{twisted.python.filepath.FilePath}
317 @param file_path: the location of the file in the local cache
318 @type hash: L{Hash.HashObject}
319 @param hash: the original (expected) hash object containing also the
320 hash of the downloaded file
321 @type new_hash: C{boolean}
322 @param new_hash: whether the has was new to this peer, and so should
325 @param url: the URI of the location of the file in the mirror
326 (optional, defaults to not adding the file to the mirror info)
327 @type forceDHT: C{boolean}
328 @param forceDHT: whether to force addition of the file to the DHT
329 even if the hash was not found in a mirror
330 (optional, defaults to False)
333 self.mirrors.updatedFile(url, file_path)
335 if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
336 return self.store(hash)
339 def store(self, hash):
340 """Add a key/value pair for the file to the DHT.
342 Sets the key and value from the hash information, and tries to add
346 value = {'c': self.my_contact}
347 pieces = hash.pieceDigests()
349 # Determine how to store any piece data
352 elif len(pieces) <= DHT_PIECES:
353 # Short enough to be stored with our peer contact info
354 value['t'] = {'t': ''.join(pieces)}
355 elif len(pieces) <= TORRENT_PIECES:
356 # Short enough to be stored in a separate key in the DHT
357 value['h'] = sha.new(''.join(pieces)).digest()
359 # Too long, must be served up by our peer HTTP server
360 value['l'] = sha.new(''.join(pieces)).digest()
362 storeDefer = self.dht.storeValue(key, value)
363 storeDefer.addCallback(self.store_done, hash)
366 def store_done(self, result, hash):
367 """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
368 log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
369 pieces = hash.pieceDigests()
370 if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
371 # Add the piece data key and value to the DHT
372 key = sha.new(''.join(pieces)).digest()
373 value = {'t': ''.join(pieces)}
375 storeDefer = self.dht.storeValue(key, value)
376 storeDefer.addCallback(self.store_torrent_done, key)
380 def store_torrent_done(self, result, key):
381 """Adding the file to the DHT is complete, and so is the workflow."""
382 log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))