2 from binascii import b2a_hex
3 from urlparse import urlunparse
6 from twisted.internet import defer
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
18 from util import findMyIPAddr, compact
20 download_dir = 'cache'
23 def __init__(self, dht):
24 log.msg('Initializing the main apt_dht application')
25 self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
26 if not self.cache_dir.child(download_dir).exists():
27 self.cache_dir.child(download_dir).makedirs()
28 self.db = DB(self.cache_dir.child('apt-dht.db'))
30 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
31 self.dht.join().addCallbacks(self.joinComplete, self.joinError)
32 self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
33 self.getHTTPFactory = self.http_server.getHTTPFactory
34 self.peers = PeerManager()
35 self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
36 other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
37 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
40 def joinComplete(self, result):
41 self.my_addr = findMyIPAddr(result,
42 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
43 config.getboolean('DEFAULT', 'LOCAL_OK'))
45 raise RuntimeError, "IP address for this machine could not be found"
46 self.cache.scanDirectories()
48 def joinError(self, failure):
49 log.msg("joining DHT failed miserably")
51 raise RuntimeError, "IP address for this machine could not be found"
53 def check_freshness(self, req, path, modtime, resp):
54 log.msg('Checking if %s is still fresh' % path)
55 d = self.peers.get('', path, method = "HEAD", modtime = modtime)
56 d.addCallback(self.check_freshness_done, req, path, resp)
59 def check_freshness_done(self, resp, req, path, orig_resp):
61 log.msg('Still fresh, returning: %s' % path)
64 log.msg('Stale, need to redownload: %s' % path)
65 return self.get_resp(req, path)
67 def get_resp(self, req, path):
70 log.msg('Trying to find hash for %s' % path)
71 findDefer = self.mirrors.findHash(path)
73 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
74 callbackArgs=(req, path, d), errbackArgs=(req, path, d))
75 findDefer.addErrback(log.err)
78 def findHash_error(self, failure, req, path, d):
80 self.findHash_done(HashObject(), req, path, d)
82 def findHash_done(self, hash, req, path, d):
83 if hash.expected() is None:
84 log.msg('Hash for %s was not found' % path)
85 self.lookupHash_done([], hash, path, d)
87 log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
89 # Lookup hash in cache
90 locations = self.db.lookupHash(hash.expected())
91 self.getCachedFile(hash, req, path, d, locations)
93 def getCachedFile(self, hash, req, path, d, locations):
95 log.msg('Failed to return file from cache: %s' % path)
96 self.lookupHash(hash, path, d)
99 # Get the first possible location from the list
100 file = locations.pop(0)['path']
101 log.msg('Returning cached file: %s' % file.path)
104 resp = static.File(file.path).renderHTTP(req)
105 if isinstance(resp, defer.Deferred):
106 resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
108 self._getCachedFile(resp, hash, req, path, d, locations)
110 def _getCachedFile(self, resp, hash, req, path, d, locations):
111 if isinstance(resp, failure.Failure):
112 log.msg('Got error trying to get cached file')
114 # Try the next possible location
115 self.getCachedFile(hash, req, path, d, locations)
118 log.msg('Cached response: %r' % resp)
120 if resp.code >= 200 and resp.code < 400:
123 # Try the next possible location
124 self.getCachedFile(hash, req, path, d, locations)
126 def lookupHash(self, hash, path, d):
127 log.msg('Looking up hash in DHT for file: %s' % path)
128 key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
129 lookupDefer = self.dht.getValue(key)
130 lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
132 def lookupHash_done(self, values, hash, path, d):
134 log.msg('Peers for %s were not found' % path)
135 getDefer = self.peers.get(hash, path)
136 getDefer.addCallback(self.cache.save_file, hash, path)
137 getDefer.addErrback(self.cache.save_error, path)
138 getDefer.addCallbacks(d.callback, d.errback)
140 log.msg('Found peers for %s: %r' % (path, values))
141 # Download from the found peers
142 getDefer = self.peers.get(hash, path, values)
143 getDefer.addCallback(self.check_response, hash, path)
144 getDefer.addCallback(self.cache.save_file, hash, path)
145 getDefer.addErrback(self.cache.save_error, path)
146 getDefer.addCallbacks(d.callback, d.errback)
148 def check_response(self, response, hash, path):
149 if response.code < 200 or response.code >= 300:
150 log.msg('Download from peers failed, going to direct download: %s' % path)
151 getDefer = self.peers.get(hash, path)
155 def new_cached_file(self, file_path, hash, url = None, forceDHT = False):
156 """Add a newly cached file to the DHT.
158 If the file was downloaded, set url to the path it was downloaded for.
159 Don't add a file to the DHT unless a hash was found for it
160 (but do add it anyway if forceDHT is True).
163 self.mirrors.updatedFile(url, file_path)
165 if self.my_addr and hash and (hash.expected() is not None or forceDHT):
166 contact = compact(self.my_addr, config.getint('DEFAULT', 'PORT'))
167 value = {'c': contact}
168 key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
169 storeDefer = self.dht.storeValue(key, value)
170 storeDefer.addCallback(self.store_done, hash)
174 def store_done(self, result, hash):
175 log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))