2 """The main program code.
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
8 @var download_dir: the name of the directory to use for downloaded files
12 from binascii import b2a_hex
13 from urlparse import urlunparse
16 from twisted.internet import defer, reactor
17 from twisted.web2 import server, http, http_headers, static
18 from twisted.python import log, failure
19 from twisted.python.filepath import FilePath
21 from apt_dht_conf import config
22 from PeerManager import PeerManager
23 from HTTPServer import TopLevel
24 from MirrorManager import MirrorManager
25 from CacheManager import CacheManager
26 from Hash import HashObject
28 from util import findMyIPAddr, compact
33 download_dir = 'cache'
36 """The main code object that does all of the work.
38 Contains all of the sub-components that do all the low-level work, and
39 coordinates communication between them.
41 @type cache_dir: L{twisted.python.filepath.FilePath}
42 @ivar cache_dir: the directory to use for storing all files
44 @ivar db: the database to use for tracking files and hashes
45 @type dht: L{interfaces.IDHT}
46 @ivar dht: the DHT instance to use
47 @type http_server: L{HTTPServer.TopLevel}
48 @ivar http_server: the web server that will handle all requests from apt
50 @type peers: L{PeerManager.PeerManager}
51 @ivar peers: the manager of all downloads from mirrors and other peers
52 @type mirrors: L{MirrorManager.MirrorManager}
53 @ivar mirrors: the manager of downloaded information about mirrors which
54 can be queried to get hashes from file names
55 @type cache: L{CacheManager.CacheManager}
56 @ivar cache: the manager of all downloaded files
57 @type my_contact: C{string}
58 @ivar my_contact: the 6-byte compact peer representation of this peer's
59 download information (IP address and port)
62 def __init__(self, dht):
63 """Initialize all the sub-components.
65 @type dht: L{interfaces.IDHT}
66 @param dht: the DHT instance to use
68 log.msg('Initializing the main apt_dht application')
69 self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
70 if not self.cache_dir.child(download_dir).exists():
71 self.cache_dir.child(download_dir).makedirs()
72 self.db = DB(self.cache_dir.child('apt-dht.db'))
74 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
75 self.dht.join().addCallbacks(self.joinComplete, self.joinError)
76 self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
77 self.getHTTPFactory = self.http_server.getHTTPFactory
78 self.peers = PeerManager()
79 self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
80 other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
81 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
82 self.my_contact = None
85 def joinComplete(self, result):
86 """Complete the DHT join process and determine our download information.
88 Called by the DHT when the join has been completed with information
89 on the external IP address and port of this peer.
91 my_addr = findMyIPAddr(result,
92 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
93 config.getboolean('DEFAULT', 'LOCAL_OK'))
95 raise RuntimeError, "IP address for this machine could not be found"
96 self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
97 self.cache.scanDirectories()
98 reactor.callLater(60, self.refreshFiles)
100 def joinError(self, failure):
101 """Joining the DHT has failed."""
102 log.msg("joining DHT failed miserably")
104 raise RuntimeError, "IP address for this machine could not be found"
106 def refreshFiles(self):
107 """Refresh any files in the DHT that are about to expire."""
108 expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
109 hashes = self.db.expiredHashes(expireAfter)
110 if len(hashes.keys()) > 0:
111 log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
112 self._refreshFiles(None, hashes)
114 def _refreshFiles(self, result, hashes):
115 if result is not None:
116 log.msg('Storage resulted in: %r' % result)
119 raw_hash = hashes.keys()[0]
120 self.db.refreshHash(raw_hash)
121 hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
123 storeDefer = self.store(hash)
124 storeDefer.addBoth(self._refreshFiles, hashes)
126 reactor.callLater(60, self.refreshFiles)
129 def check_freshness(self, req, url, modtime, resp):
130 """Send a HEAD to the mirror to check if the response from the cache is still valid.
132 @type req: L{twisted.web2.http.Request}
133 @param req: the initial request sent to the HTTP server by apt
134 @param url: the URI of the actual mirror request
135 @type modtime: C{int}
136 @param modtime: the modified time of the cached file (seconds since epoch)
137 @type resp: L{twisted.web2.http.Response}
138 @param resp: the response from the cache to be sent to apt
139 @rtype: L{twisted.internet.defer.Deferred}
140 @return: a deferred that will be called back with the correct response
142 log.msg('Checking if %s is still fresh' % url)
143 d = self.peers.get('', url, method = "HEAD", modtime = modtime)
144 d.addCallback(self.check_freshness_done, req, url, resp)
147 def check_freshness_done(self, resp, req, url, orig_resp):
148 """Process the returned response from the mirror.
150 @type resp: L{twisted.web2.http.Response}
151 @param resp: the response from the mirror to the HEAD request
152 @type req: L{twisted.web2.http.Request}
153 @param req: the initial request sent to the HTTP server by apt
154 @param url: the URI of the actual mirror request
155 @type orig_resp: L{twisted.web2.http.Response}
156 @param orig_resp: the response from the cache to be sent to apt
159 log.msg('Still fresh, returning: %s' % url)
162 log.msg('Stale, need to redownload: %s' % url)
163 return self.get_resp(req, url)
165 def get_resp(self, req, url):
166 """Lookup a hash for the file in the local mirror info.
168 Starts the process of getting a response to an uncached apt request.
170 @type req: L{twisted.web2.http.Request}
171 @param req: the initial request sent to the HTTP server by apt
172 @param url: the URI of the actual mirror request
173 @rtype: L{twisted.internet.defer.Deferred}
174 @return: a deferred that will be called back with the response
178 log.msg('Trying to find hash for %s' % url)
179 findDefer = self.mirrors.findHash(url)
181 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
182 callbackArgs=(req, url, d), errbackArgs=(req, url, d))
183 findDefer.addErrback(log.err)
186 def findHash_error(self, failure, req, url, d):
187 """Process the error in hash lookup by returning an empty L{HashObject}."""
189 self.findHash_done(HashObject(), req, url, d)
191 def findHash_done(self, hash, req, url, d):
192 """Use the returned hash to lookup the file in the cache.
194 If the hash was not found, the workflow skips down to download from
195 the mirror (L{lookupHash_done}).
197 @type hash: L{Hash.HashObject}
198 @param hash: the hash object containing the expected hash for the file
200 if hash.expected() is None:
201 log.msg('Hash for %s was not found' % url)
202 self.lookupHash_done([], hash, url, d)
204 log.msg('Found hash %s for %s' % (hash.hexexpected(), url))
206 # Lookup hash in cache
207 locations = self.db.lookupHash(hash.expected(), filesOnly = True)
208 self.getCachedFile(hash, req, url, d, locations)
210 def getCachedFile(self, hash, req, url, d, locations):
211 """Try to return the file from the cache, otherwise move on to a DHT lookup.
213 @type locations: C{list} of C{dictionary}
214 @param locations: the files in the cache that match the hash,
215 the dictionary contains a key 'path' whose value is a
216 L{twisted.python.filepath.FilePath} object for the file.
219 log.msg('Failed to return file from cache: %s' % url)
220 self.lookupHash(hash, url, d)
223 # Get the first possible location from the list
224 file = locations.pop(0)['path']
225 log.msg('Returning cached file: %s' % file.path)
228 resp = static.File(file.path).renderHTTP(req)
229 if isinstance(resp, defer.Deferred):
230 resp.addBoth(self._getCachedFile, hash, req, url, d, locations)
232 self._getCachedFile(resp, hash, req, url, d, locations)
234 def _getCachedFile(self, resp, hash, req, url, d, locations):
235 """Check the returned response to be sure it is valid."""
236 if isinstance(resp, failure.Failure):
237 log.msg('Got error trying to get cached file')
239 # Try the next possible location
240 self.getCachedFile(hash, req, url, d, locations)
243 log.msg('Cached response: %r' % resp)
245 if resp.code >= 200 and resp.code < 400:
248 # Try the next possible location
249 self.getCachedFile(hash, req, url, d, locations)
251 def lookupHash(self, hash, url, d):
252 """Lookup the hash in the DHT."""
253 log.msg('Looking up hash in DHT for file: %s' % url)
254 key = hash.expected()
255 lookupDefer = self.dht.getValue(key)
256 lookupDefer.addCallback(self.lookupHash_done, hash, url, d)
258 def lookupHash_done(self, values, hash, url, d):
259 """Start the download of the file.
261 The download will be from peers if the DHT lookup succeeded, or
262 from the mirror otherwise.
264 @type values: C{list} of C{dictionary}
265 @param values: the returned values from the DHT containing peer
269 log.msg('Peers for %s were not found' % url)
270 getDefer = self.peers.get(hash, url)
271 getDefer.addCallback(self.cache.save_file, hash, url)
272 getDefer.addErrback(self.cache.save_error, url)
273 getDefer.addCallbacks(d.callback, d.errback)
275 log.msg('Found peers for %s: %r' % (url, values))
276 # Download from the found peers
277 getDefer = self.peers.get(hash, url, values)
278 getDefer.addCallback(self.check_response, hash, url)
279 getDefer.addCallback(self.cache.save_file, hash, url)
280 getDefer.addErrback(self.cache.save_error, url)
281 getDefer.addCallbacks(d.callback, d.errback)
283 def check_response(self, response, hash, url):
284 """Check the response from peers, and download from the mirror if it is not."""
285 if response.code < 200 or response.code >= 300:
286 log.msg('Download from peers failed, going to direct download: %s' % url)
287 getDefer = self.peers.get(hash, url)
291 def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
292 """Add a newly cached file to the mirror info and/or the DHT.
294 If the file was downloaded, set url to the path it was downloaded for.
295 Doesn't add a file to the DHT unless a hash was found for it
296 (but does add it anyway if forceDHT is True).
298 @type file_path: L{twisted.python.filepath.FilePath}
299 @param file_path: the location of the file in the local cache
300 @type hash: L{Hash.HashObject}
301 @param hash: the original (expected) hash object containing also the
302 hash of the downloaded file
303 @type new_hash: C{boolean}
304 @param new_hash: whether the has was new to this peer, and so should
307 @param url: the URI of the location of the file in the mirror
308 (optional, defaults to not adding the file to the mirror info)
309 @type forceDHT: C{boolean}
310 @param forceDHT: whether to force addition of the file to the DHT
311 even if the hash was not found in a mirror
312 (optional, defaults to False)
315 self.mirrors.updatedFile(url, file_path)
317 if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
318 return self.store(hash)
321 def store(self, hash):
322 """Add a key/value pair for the file to the DHT.
324 Sets the key and value from the hash information, and tries to add
328 value = {'c': self.my_contact}
329 pieces = hash.pieceDigests()
331 # Determine how to store any piece data
334 elif len(pieces) <= DHT_PIECES:
335 # Short enough to be stored with our peer contact info
336 value['t'] = {'t': ''.join(pieces)}
337 elif len(pieces) <= TORRENT_PIECES:
338 # Short enough to be stored in a separate key in the DHT
339 s = sha.new().update(''.join(pieces))
340 value['h'] = s.digest()
342 # Too long, must be served up by our peer HTTP server
343 s = sha.new().update(''.join(pieces))
344 value['l'] = s.digest()
346 storeDefer = self.dht.storeValue(key, value)
347 storeDefer.addCallback(self.store_done, hash)
350 def store_done(self, result, hash):
351 """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
352 log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
353 pieces = hash.pieceDigests()
354 if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
355 # Add the piece data key and value to the DHT
356 s = sha.new().update(''.join(pieces))
358 value = {'t': ''.join(pieces)}
360 storeDefer = self.dht.storeValue(key, value)
361 storeDefer.addCallback(self.store_torrent_done, key)
365 def store_torrent_done(self, result, key):
366 """Adding the file to the DHT is complete, and so is the workflow."""
367 log.msg('Added torrent string %s to the DHT: %r' % (b2ahex(key), result))