2 """The main program code.
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
8 @var download_dir: the name of the directory to use for downloaded files
12 from binascii import b2a_hex
13 from urlparse import urlunparse
16 from twisted.internet import defer, reactor
17 from twisted.web2 import server, http, http_headers, static
18 from twisted.python import log, failure
19 from twisted.python.filepath import FilePath
21 from interfaces import IDHT, IDHTStats
22 from apt_p2p_conf import config
23 from PeerManager import PeerManager
24 from HTTPServer import TopLevel
25 from MirrorManager import MirrorManager
26 from CacheManager import CacheManager
27 from Hash import HashObject
29 from util import findMyIPAddr, compact
34 download_dir = 'cache'
37 """The main code object that does all of the work.
39 Contains all of the sub-components that do all the low-level work, and
40 coordinates communication between them.
42 @type dhtClass: L{interfaces.IDHT}
43 @ivar dhtClass: the DHT class to use
44 @type cache_dir: L{twisted.python.filepath.FilePath}
45 @ivar cache_dir: the directory to use for storing all files
47 @ivar db: the database to use for tracking files and hashes
48 @type dht: L{interfaces.IDHT}
49 @ivar dht: the DHT instance
50 @type http_server: L{HTTPServer.TopLevel}
51 @ivar http_server: the web server that will handle all requests from apt
53 @type peers: L{PeerManager.PeerManager}
54 @ivar peers: the manager of all downloads from mirrors and other peers
55 @type mirrors: L{MirrorManager.MirrorManager}
56 @ivar mirrors: the manager of downloaded information about mirrors which
57 can be queried to get hashes from file names
58 @type cache: L{CacheManager.CacheManager}
59 @ivar cache: the manager of all downloaded files
60 @type my_contact: C{string}
61 @ivar my_contact: the 6-byte compact peer representation of this peer's
62 download information (IP address and port)
65 def __init__(self, dhtClass):
66 """Initialize all the sub-components.
68 @type dht: L{interfaces.IDHT}
69 @param dht: the DHT class to use
71 log.msg('Initializing the main apt_p2p application')
72 self.dhtClass = dhtClass
73 self.cache_dir = FilePath(config.get('DEFAULT', 'CACHE_DIR'))
74 if not self.cache_dir.child(download_dir).exists():
75 self.cache_dir.child(download_dir).makedirs()
76 self.db = DB(self.cache_dir.child('apt-p2p.db'))
78 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
79 self.dht.join().addCallbacks(self.joinComplete, self.joinError)
80 self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self,
81 config.getint('DEFAULT', 'UPLOAD_LIMIT'))
82 self.getHTTPFactory = self.http_server.getHTTPFactory
83 self.peers = PeerManager(self.cache_dir, self.dht)
84 self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
85 other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
86 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
87 self.my_contact = None
90 def joinComplete(self, result):
91 """Complete the DHT join process and determine our download information.
93 Called by the DHT when the join has been completed with information
94 on the external IP address and port of this peer.
96 my_addr = findMyIPAddr(result,
97 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
98 config.getboolean('DEFAULT', 'LOCAL_OK'))
100 raise RuntimeError, "IP address for this machine could not be found"
101 self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
102 self.cache.scanDirectories()
103 reactor.callLater(60, self.refreshFiles)
105 def joinError(self, failure):
106 """Joining the DHT has failed."""
107 log.msg("joining DHT failed miserably")
109 raise RuntimeError, "IP address for this machine could not be found"
111 def refreshFiles(self):
112 """Refresh any files in the DHT that are about to expire."""
113 expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
114 hashes = self.db.expiredHashes(expireAfter)
115 if len(hashes.keys()) > 0:
116 log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
117 self._refreshFiles(None, hashes)
119 def _refreshFiles(self, result, hashes):
120 if result is not None:
121 log.msg('Storage resulted in: %r' % result)
124 raw_hash = hashes.keys()[0]
125 self.db.refreshHash(raw_hash)
126 hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
128 storeDefer = self.store(hash)
129 storeDefer.addBoth(self._refreshFiles, hashes)
131 reactor.callLater(60, self.refreshFiles)
134 """Retrieve and format the statistics for the program.
137 @return: the formatted HTML page containing the statistics
139 out = '<html><body>\n\n'
140 if IDHTStats.implementedBy(self.dhtClass):
141 out += self.dht.getStats()
142 out += '\n</body></html>\n'
146 def check_freshness(self, req, url, modtime, resp):
147 """Send a HEAD to the mirror to check if the response from the cache is still valid.
149 @type req: L{twisted.web2.http.Request}
150 @param req: the initial request sent to the HTTP server by apt
151 @param url: the URI of the actual mirror request
152 @type modtime: C{int}
153 @param modtime: the modified time of the cached file (seconds since epoch)
154 @type resp: L{twisted.web2.http.Response}
155 @param resp: the response from the cache to be sent to apt
156 @rtype: L{twisted.internet.defer.Deferred}
157 @return: a deferred that will be called back with the correct response
159 log.msg('Checking if %s is still fresh' % url)
160 d = self.peers.get('', url, method = "HEAD", modtime = modtime)
161 d.addCallback(self.check_freshness_done, req, url, resp)
164 def check_freshness_done(self, resp, req, url, orig_resp):
165 """Process the returned response from the mirror.
167 @type resp: L{twisted.web2.http.Response}
168 @param resp: the response from the mirror to the HEAD request
169 @type req: L{twisted.web2.http.Request}
170 @param req: the initial request sent to the HTTP server by apt
171 @param url: the URI of the actual mirror request
172 @type orig_resp: L{twisted.web2.http.Response}
173 @param orig_resp: the response from the cache to be sent to apt
176 log.msg('Still fresh, returning: %s' % url)
179 log.msg('Stale, need to redownload: %s' % url)
180 return self.get_resp(req, url)
182 def get_resp(self, req, url):
183 """Lookup a hash for the file in the local mirror info.
185 Starts the process of getting a response to an uncached apt request.
187 @type req: L{twisted.web2.http.Request}
188 @param req: the initial request sent to the HTTP server by apt
189 @param url: the URI of the actual mirror request
190 @rtype: L{twisted.internet.defer.Deferred}
191 @return: a deferred that will be called back with the response
195 log.msg('Trying to find hash for %s' % url)
196 findDefer = self.mirrors.findHash(url)
198 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
199 callbackArgs=(req, url, d), errbackArgs=(req, url, d))
200 findDefer.addErrback(log.err)
203 def findHash_error(self, failure, req, url, d):
204 """Process the error in hash lookup by returning an empty L{HashObject}."""
206 self.findHash_done(HashObject(), req, url, d)
208 def findHash_done(self, hash, req, url, d):
209 """Use the returned hash to lookup the file in the cache.
211 If the hash was not found, the workflow skips down to download from
212 the mirror (L{lookupHash_done}).
214 @type hash: L{Hash.HashObject}
215 @param hash: the hash object containing the expected hash for the file
217 if hash.expected() is None:
218 log.msg('Hash for %s was not found' % url)
219 self.lookupHash_done([], hash, url, d)
221 log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
223 # Lookup hash in cache
224 locations = self.db.lookupHash(hash.expected(), filesOnly = True)
225 self.getCachedFile(hash, req, url, d, locations)
227 def getCachedFile(self, hash, req, url, d, locations):
228 """Try to return the file from the cache, otherwise move on to a DHT lookup.
230 @type locations: C{list} of C{dictionary}
231 @param locations: the files in the cache that match the hash,
232 the dictionary contains a key 'path' whose value is a
233 L{twisted.python.filepath.FilePath} object for the file.
236 log.msg('Failed to return file from cache: %s' % url)
237 self.lookupHash(hash, url, d)
240 # Get the first possible location from the list
241 file = locations.pop(0)['path']
242 log.msg('Returning cached file: %s' % file.path)
245 resp = static.File(file.path).renderHTTP(req)
246 if isinstance(resp, defer.Deferred):
247 resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
249 self._getCachedFile(resp, hash, req, url, d, locations)
251 def _getCachedFile(self, resp, hash, req, url, d, locations):
252 """Check the returned response to be sure it is valid."""
253 if isinstance(resp, failure.Failure):
254 log.msg('Got error trying to get cached file')
256 # Try the next possible location
257 self.getCachedFile(hash, req, url, d, locations)
260 log.msg('Cached response: %r' % resp)
262 if resp.code >= 200 and resp.code < 400:
265 # Try the next possible location
266 self.getCachedFile(hash, req, url, d, locations)
268 def lookupHash(self, hash, url, d):
269 """Lookup the hash in the DHT."""
270 log.msg('Looking up hash in DHT for file: %s' % url)
271 key = hash.expected()
272 lookupDefer = self.dht.getValue(key)
273 lookupDefer.addCallback(self.lookupHash_done, hash, url, d)
275 def lookupHash_done(self, values, hash, url, d):
276 """Start the download of the file.
278 The download will be from peers if the DHT lookup succeeded, or
279 from the mirror otherwise.
281 @type values: C{list} of C{dictionary}
282 @param values: the returned values from the DHT containing peer
286 log.msg('Peers for %s were not found' % url)
287 getDefer = self.peers.get(hash, url)
288 getDefer.addCallback(self.cache.save_file, hash, url)
289 getDefer.addErrback(self.cache.save_error, url)
290 getDefer.addCallbacks(d.callback, d.errback)
292 log.msg('Found peers for %s: %r' % (url, values))
293 # Download from the found peers
294 getDefer = self.peers.get(hash, url, values)
295 getDefer.addCallback(self.check_response, hash, url)
296 getDefer.addCallback(self.cache.save_file, hash, url)
297 getDefer.addErrback(self.cache.save_error, url)
298 getDefer.addCallbacks(d.callback, d.errback)
300 def check_response(self, response, hash, url):
301 """Check the response from peers, and download from the mirror if it is not."""
302 if response.code < 200 or response.code >= 300:
303 log.msg('Download from peers failed, going to direct download: %s' % url)
304 getDefer = self.peers.get(hash, url)
308 def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
309 """Add a newly cached file to the mirror info and/or the DHT.
311 If the file was downloaded, set url to the path it was downloaded for.
312 Doesn't add a file to the DHT unless a hash was found for it
313 (but does add it anyway if forceDHT is True).
315 @type file_path: L{twisted.python.filepath.FilePath}
316 @param file_path: the location of the file in the local cache
317 @type hash: L{Hash.HashObject}
318 @param hash: the original (expected) hash object containing also the
319 hash of the downloaded file
320 @type new_hash: C{boolean}
321 @param new_hash: whether the has was new to this peer, and so should
324 @param url: the URI of the location of the file in the mirror
325 (optional, defaults to not adding the file to the mirror info)
326 @type forceDHT: C{boolean}
327 @param forceDHT: whether to force addition of the file to the DHT
328 even if the hash was not found in a mirror
329 (optional, defaults to False)
332 self.mirrors.updatedFile(url, file_path)
334 if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
335 return self.store(hash)
338 def store(self, hash):
339 """Add a key/value pair for the file to the DHT.
341 Sets the key and value from the hash information, and tries to add
345 value = {'c': self.my_contact}
346 pieces = hash.pieceDigests()
348 # Determine how to store any piece data
351 elif len(pieces) <= DHT_PIECES:
352 # Short enough to be stored with our peer contact info
353 value['t'] = {'t': ''.join(pieces)}
354 elif len(pieces) <= TORRENT_PIECES:
355 # Short enough to be stored in a separate key in the DHT
356 value['h'] = sha.new(''.join(pieces)).digest()
358 # Too long, must be served up by our peer HTTP server
359 value['l'] = sha.new(''.join(pieces)).digest()
361 storeDefer = self.dht.storeValue(key, value)
362 storeDefer.addCallback(self.store_done, hash)
365 def store_done(self, result, hash):
366 """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
367 log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
368 pieces = hash.pieceDigests()
369 if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
370 # Add the piece data key and value to the DHT
371 key = sha.new(''.join(pieces)).digest()
372 value = {'t': ''.join(pieces)}
374 storeDefer = self.dht.storeValue(key, value)
375 storeDefer.addCallback(self.store_torrent_done, key)
379 def store_torrent_done(self, result, key):
380 """Adding the file to the DHT is complete, and so is the workflow."""
381 log.msg('Added torrent string %s to the DHT: %r' % (b2a_hex(key), result))