2 from binascii import b2a_hex
3 from urlparse import urlunparse
6 from twisted.internet import defer, reactor
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
18 from util import findMyIPAddr, compact
20 download_dir = 'cache'
23 def __init__(self, dht):
24 log.msg('Initializing the main apt_dht application')
25 self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
26 if not self.cache_dir.child(download_dir).exists():
27 self.cache_dir.child(download_dir).makedirs()
28 self.db = DB(self.cache_dir.child('apt-dht.db'))
30 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
31 self.dht.join().addCallbacks(self.joinComplete, self.joinError)
32 self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
33 self.getHTTPFactory = self.http_server.getHTTPFactory
34 self.peers = PeerManager()
35 self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
36 other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
37 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
38 self.my_contact = None
40 def joinComplete(self, result):
41 my_addr = findMyIPAddr(result,
42 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
43 config.getboolean('DEFAULT', 'LOCAL_OK'))
45 raise RuntimeError, "IP address for this machine could not be found"
46 self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
47 self.cache.scanDirectories()
48 reactor.callLater(60, self.refreshFiles)
50 def joinError(self, failure):
51 log.msg("joining DHT failed miserably")
53 raise RuntimeError, "IP address for this machine could not be found"
55 def refreshFiles(self):
56 """Refresh any files in the DHT that are about to expire."""
57 expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
58 hashes = self.db.expiredFiles(expireAfter)
59 if len(hashes.keys()) > 0:
60 log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
61 for raw_hash in hashes:
62 self.db.refreshHash(raw_hash)
63 hash = HashObject(raw_hash)
64 key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
65 value = {'c': self.my_contact}
66 storeDefer = self.dht.storeValue(key, value)
67 storeDefer.addCallback(self.store_done, hash)
68 reactor.callLater(60, self.refreshFiles)
70 def check_freshness(self, req, path, modtime, resp):
71 log.msg('Checking if %s is still fresh' % path)
72 d = self.peers.get('', path, method = "HEAD", modtime = modtime)
73 d.addCallback(self.check_freshness_done, req, path, resp)
76 def check_freshness_done(self, resp, req, path, orig_resp):
78 log.msg('Still fresh, returning: %s' % path)
81 log.msg('Stale, need to redownload: %s' % path)
82 return self.get_resp(req, path)
84 def get_resp(self, req, path):
87 log.msg('Trying to find hash for %s' % path)
88 findDefer = self.mirrors.findHash(path)
90 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
91 callbackArgs=(req, path, d), errbackArgs=(req, path, d))
92 findDefer.addErrback(log.err)
95 def findHash_error(self, failure, req, path, d):
97 self.findHash_done(HashObject(), req, path, d)
99 def findHash_done(self, hash, req, path, d):
100 if hash.expected() is None:
101 log.msg('Hash for %s was not found' % path)
102 self.lookupHash_done([], hash, path, d)
104 log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
106 # Lookup hash in cache
107 locations = self.db.lookupHash(hash.expected())
108 self.getCachedFile(hash, req, path, d, locations)
110 def getCachedFile(self, hash, req, path, d, locations):
112 log.msg('Failed to return file from cache: %s' % path)
113 self.lookupHash(hash, path, d)
116 # Get the first possible location from the list
117 file = locations.pop(0)['path']
118 log.msg('Returning cached file: %s' % file.path)
121 resp = static.File(file.path).renderHTTP(req)
122 if isinstance(resp, defer.Deferred):
123 resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
125 self._getCachedFile(resp, hash, req, path, d, locations)
127 def _getCachedFile(self, resp, hash, req, path, d, locations):
128 if isinstance(resp, failure.Failure):
129 log.msg('Got error trying to get cached file')
131 # Try the next possible location
132 self.getCachedFile(hash, req, path, d, locations)
135 log.msg('Cached response: %r' % resp)
137 if resp.code >= 200 and resp.code < 400:
140 # Try the next possible location
141 self.getCachedFile(hash, req, path, d, locations)
143 def lookupHash(self, hash, path, d):
144 log.msg('Looking up hash in DHT for file: %s' % path)
145 key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
146 lookupDefer = self.dht.getValue(key)
147 lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
149 def lookupHash_done(self, values, hash, path, d):
151 log.msg('Peers for %s were not found' % path)
152 getDefer = self.peers.get(hash, path)
153 getDefer.addCallback(self.cache.save_file, hash, path)
154 getDefer.addErrback(self.cache.save_error, path)
155 getDefer.addCallbacks(d.callback, d.errback)
157 log.msg('Found peers for %s: %r' % (path, values))
158 # Download from the found peers
159 getDefer = self.peers.get(hash, path, values)
160 getDefer.addCallback(self.check_response, hash, path)
161 getDefer.addCallback(self.cache.save_file, hash, path)
162 getDefer.addErrback(self.cache.save_error, path)
163 getDefer.addCallbacks(d.callback, d.errback)
165 def check_response(self, response, hash, path):
166 if response.code < 200 or response.code >= 300:
167 log.msg('Download from peers failed, going to direct download: %s' % path)
168 getDefer = self.peers.get(hash, path)
172 def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
173 """Add a newly cached file to the DHT.
175 If the file was downloaded, set url to the path it was downloaded for.
176 Don't add a file to the DHT unless a hash was found for it
177 (but do add it anyway if forceDHT is True).
180 self.mirrors.updatedFile(url, file_path)
182 if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
183 key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
184 value = {'c': self.my_contact}
185 storeDefer = self.dht.storeValue(key, value)
186 storeDefer.addCallback(self.store_done, hash)
190 def store_done(self, result, hash):
191 log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))