2 from binascii import b2a_hex
3 from urlparse import urlunparse
6 from twisted.internet import defer, reactor
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
18 from util import findMyIPAddr, compact
23 download_dir = 'cache'
26 def __init__(self, dht):
27 log.msg('Initializing the main apt_dht application')
28 self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
29 if not self.cache_dir.child(download_dir).exists():
30 self.cache_dir.child(download_dir).makedirs()
31 self.db = DB(self.cache_dir.child('apt-dht.db'))
33 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
34 self.dht.join().addCallbacks(self.joinComplete, self.joinError)
35 self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
36 self.getHTTPFactory = self.http_server.getHTTPFactory
37 self.peers = PeerManager()
38 self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
39 other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
40 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
41 self.my_contact = None
43 def joinComplete(self, result):
44 my_addr = findMyIPAddr(result,
45 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
46 config.getboolean('DEFAULT', 'LOCAL_OK'))
48 raise RuntimeError, "IP address for this machine could not be found"
49 self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
50 self.cache.scanDirectories()
51 reactor.callLater(60, self.refreshFiles)
53 def joinError(self, failure):
54 log.msg("joining DHT failed miserably")
56 raise RuntimeError, "IP address for this machine could not be found"
58 def refreshFiles(self):
59 """Refresh any files in the DHT that are about to expire."""
60 expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
61 hashes = self.db.expiredFiles(expireAfter)
62 if len(hashes.keys()) > 0:
63 log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
64 for raw_hash in hashes:
65 self.db.refreshHash(raw_hash)
66 hash = HashObject(raw_hash)
67 key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
68 value = {'c': self.my_contact}
69 storeDefer = self.dht.storeValue(key, value)
70 storeDefer.addCallback(self.store_done, hash)
71 reactor.callLater(60, self.refreshFiles)
73 def check_freshness(self, req, path, modtime, resp):
74 log.msg('Checking if %s is still fresh' % path)
75 d = self.peers.get('', path, method = "HEAD", modtime = modtime)
76 d.addCallback(self.check_freshness_done, req, path, resp)
79 def check_freshness_done(self, resp, req, path, orig_resp):
81 log.msg('Still fresh, returning: %s' % path)
84 log.msg('Stale, need to redownload: %s' % path)
85 return self.get_resp(req, path)
87 def get_resp(self, req, path):
90 log.msg('Trying to find hash for %s' % path)
91 findDefer = self.mirrors.findHash(path)
93 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
94 callbackArgs=(req, path, d), errbackArgs=(req, path, d))
95 findDefer.addErrback(log.err)
98 def findHash_error(self, failure, req, path, d):
100 self.findHash_done(HashObject(), req, path, d)
102 def findHash_done(self, hash, req, path, d):
103 if hash.expected() is None:
104 log.msg('Hash for %s was not found' % path)
105 self.lookupHash_done([], hash, path, d)
107 log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
109 # Lookup hash in cache
110 locations = self.db.lookupHash(hash.expected())
111 self.getCachedFile(hash, req, path, d, locations)
113 def getCachedFile(self, hash, req, path, d, locations):
115 log.msg('Failed to return file from cache: %s' % path)
116 self.lookupHash(hash, path, d)
119 # Get the first possible location from the list
120 file = locations.pop(0)['path']
121 log.msg('Returning cached file: %s' % file.path)
124 resp = static.File(file.path).renderHTTP(req)
125 if isinstance(resp, defer.Deferred):
126 resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
128 self._getCachedFile(resp, hash, req, path, d, locations)
130 def _getCachedFile(self, resp, hash, req, path, d, locations):
131 if isinstance(resp, failure.Failure):
132 log.msg('Got error trying to get cached file')
134 # Try the next possible location
135 self.getCachedFile(hash, req, path, d, locations)
138 log.msg('Cached response: %r' % resp)
140 if resp.code >= 200 and resp.code < 400:
143 # Try the next possible location
144 self.getCachedFile(hash, req, path, d, locations)
146 def lookupHash(self, hash, path, d):
147 log.msg('Looking up hash in DHT for file: %s' % path)
148 key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
149 lookupDefer = self.dht.getValue(key)
150 lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
152 def lookupHash_done(self, values, hash, path, d):
154 log.msg('Peers for %s were not found' % path)
155 getDefer = self.peers.get(hash, path)
156 getDefer.addCallback(self.cache.save_file, hash, path)
157 getDefer.addErrback(self.cache.save_error, path)
158 getDefer.addCallbacks(d.callback, d.errback)
160 log.msg('Found peers for %s: %r' % (path, values))
161 # Download from the found peers
162 getDefer = self.peers.get(hash, path, values)
163 getDefer.addCallback(self.check_response, hash, path)
164 getDefer.addCallback(self.cache.save_file, hash, path)
165 getDefer.addErrback(self.cache.save_error, path)
166 getDefer.addCallbacks(d.callback, d.errback)
168 def check_response(self, response, hash, path):
169 if response.code < 200 or response.code >= 300:
170 log.msg('Download from peers failed, going to direct download: %s' % path)
171 getDefer = self.peers.get(hash, path)
175 def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
176 """Add a newly cached file to the DHT.
178 If the file was downloaded, set url to the path it was downloaded for.
179 Don't add a file to the DHT unless a hash was found for it
180 (but do add it anyway if forceDHT is True).
183 self.mirrors.updatedFile(url, file_path)
185 if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
186 key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
187 value = {'c': self.my_contact}
188 pieces = hash.pieceDigests()
191 elif len(pieces) <= DHT_PIECES:
192 value['t'] = {'t': ''.join(pieces)}
193 elif len(pieces) <= TORRENT_PIECES:
194 s = sha.new().update(''.join(pieces))
195 value['h'] = s.digest()
197 s = sha.new().update(''.join(pieces))
198 value['l'] = s.digest()
199 storeDefer = self.dht.storeValue(key, value)
200 storeDefer.addCallback(self.store_done, hash)
204 def store_done(self, result, hash):
205 log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
206 pieces = hash.pieceDigests()
207 if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
208 s = sha.new().update(''.join(pieces))
210 value = {'t': ''.join(pieces)}
211 storeDefer = self.dht.storeValue(key, value)
212 storeDefer.addCallback(self.store_torrent_done, key)
216 def store_torrent_done(self, result, key):
217 log.msg('Added torrent string %s to the DHT: %r' % (b2ahex(key), result))