2 from binascii import b2a_hex
3 from urlparse import urlunparse
6 from twisted.internet import defer, reactor
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
18 from util import findMyIPAddr, compact
23 download_dir = 'cache'
26 def __init__(self, dht):
27 log.msg('Initializing the main apt_dht application')
28 self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
29 if not self.cache_dir.child(download_dir).exists():
30 self.cache_dir.child(download_dir).makedirs()
31 self.db = DB(self.cache_dir.child('apt-dht.db'))
33 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
34 self.dht.join().addCallbacks(self.joinComplete, self.joinError)
35 self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
36 self.getHTTPFactory = self.http_server.getHTTPFactory
37 self.peers = PeerManager()
38 self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
39 other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
40 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
41 self.my_contact = None
43 def joinComplete(self, result):
44 my_addr = findMyIPAddr(result,
45 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
46 config.getboolean('DEFAULT', 'LOCAL_OK'))
48 raise RuntimeError, "IP address for this machine could not be found"
49 self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
50 self.cache.scanDirectories()
51 reactor.callLater(60, self.refreshFiles)
53 def joinError(self, failure):
54 log.msg("joining DHT failed miserably")
56 raise RuntimeError, "IP address for this machine could not be found"
58 def refreshFiles(self):
59 """Refresh any files in the DHT that are about to expire."""
60 expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
61 hashes = self.db.expiredHashes(expireAfter)
62 if len(hashes.keys()) > 0:
63 log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
64 self._refreshFiles(None, hashes)
66 def _refreshFiles(self, result, hashes):
67 if result is not None:
68 log.msg('Storage resulted in: %r' % result)
71 raw_hash = hashes.keys()[0]
72 self.db.refreshHash(raw_hash)
73 hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
75 storeDefer = self.store(hash)
76 storeDefer.addBoth(self._refreshFiles, hashes)
78 reactor.callLater(60, self.refreshFiles)
80 def check_freshness(self, req, path, modtime, resp):
81 log.msg('Checking if %s is still fresh' % path)
82 d = self.peers.get('', path, method = "HEAD", modtime = modtime)
83 d.addCallback(self.check_freshness_done, req, path, resp)
86 def check_freshness_done(self, resp, req, path, orig_resp):
88 log.msg('Still fresh, returning: %s' % path)
91 log.msg('Stale, need to redownload: %s' % path)
92 return self.get_resp(req, path)
94 def get_resp(self, req, path):
97 log.msg('Trying to find hash for %s' % path)
98 findDefer = self.mirrors.findHash(path)
100 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
101 callbackArgs=(req, path, d), errbackArgs=(req, path, d))
102 findDefer.addErrback(log.err)
105 def findHash_error(self, failure, req, path, d):
107 self.findHash_done(HashObject(), req, path, d)
109 def findHash_done(self, hash, req, path, d):
110 if hash.expected() is None:
111 log.msg('Hash for %s was not found' % path)
112 self.lookupHash_done([], hash, path, d)
114 log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
116 # Lookup hash in cache
117 locations = self.db.lookupHash(hash.expected(), filesOnly = True)
118 self.getCachedFile(hash, req, path, d, locations)
120 def getCachedFile(self, hash, req, path, d, locations):
122 log.msg('Failed to return file from cache: %s' % path)
123 self.lookupHash(hash, path, d)
126 # Get the first possible location from the list
127 file = locations.pop(0)['path']
128 log.msg('Returning cached file: %s' % file.path)
131 resp = static.File(file.path).renderHTTP(req)
132 if isinstance(resp, defer.Deferred):
133 resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
135 self._getCachedFile(resp, hash, req, path, d, locations)
137 def _getCachedFile(self, resp, hash, req, path, d, locations):
138 if isinstance(resp, failure.Failure):
139 log.msg('Got error trying to get cached file')
141 # Try the next possible location
142 self.getCachedFile(hash, req, path, d, locations)
145 log.msg('Cached response: %r' % resp)
147 if resp.code >= 200 and resp.code < 400:
150 # Try the next possible location
151 self.getCachedFile(hash, req, path, d, locations)
153 def lookupHash(self, hash, path, d):
154 log.msg('Looking up hash in DHT for file: %s' % path)
155 key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
156 lookupDefer = self.dht.getValue(key)
157 lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
159 def lookupHash_done(self, values, hash, path, d):
161 log.msg('Peers for %s were not found' % path)
162 getDefer = self.peers.get(hash, path)
163 getDefer.addCallback(self.cache.save_file, hash, path)
164 getDefer.addErrback(self.cache.save_error, path)
165 getDefer.addCallbacks(d.callback, d.errback)
167 log.msg('Found peers for %s: %r' % (path, values))
168 # Download from the found peers
169 getDefer = self.peers.get(hash, path, values)
170 getDefer.addCallback(self.check_response, hash, path)
171 getDefer.addCallback(self.cache.save_file, hash, path)
172 getDefer.addErrback(self.cache.save_error, path)
173 getDefer.addCallbacks(d.callback, d.errback)
175 def check_response(self, response, hash, path):
176 if response.code < 200 or response.code >= 300:
177 log.msg('Download from peers failed, going to direct download: %s' % path)
178 getDefer = self.peers.get(hash, path)
182 def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
183 """Add a newly cached file to the appropriate places.
185 If the file was downloaded, set url to the path it was downloaded for.
186 Doesn't add a file to the DHT unless a hash was found for it
187 (but does add it anyway if forceDHT is True).
190 self.mirrors.updatedFile(url, file_path)
192 if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
193 return self.store(hash)
196 def store(self, hash):
197 """Add a file to the DHT."""
198 key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
199 value = {'c': self.my_contact}
200 pieces = hash.pieceDigests()
203 elif len(pieces) <= DHT_PIECES:
204 value['t'] = {'t': ''.join(pieces)}
205 elif len(pieces) <= TORRENT_PIECES:
206 s = sha.new().update(''.join(pieces))
207 value['h'] = s.digest()
209 s = sha.new().update(''.join(pieces))
210 value['l'] = s.digest()
211 storeDefer = self.dht.storeValue(key, value)
212 storeDefer.addCallback(self.store_done, hash)
215 def store_done(self, result, hash):
216 log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
217 pieces = hash.pieceDigests()
218 if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
219 s = sha.new().update(''.join(pieces))
221 value = {'t': ''.join(pieces)}
222 storeDefer = self.dht.storeValue(key, value)
223 storeDefer.addCallback(self.store_torrent_done, key)
227 def store_torrent_done(self, result, key):
228 log.msg('Added torrent string %s to the DHT: %r' % (b2ahex(key), result))