2 from binascii import b2a_hex
3 from urlparse import urlunparse
6 from twisted.internet import defer
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
18 from util import findMyIPAddr
20 download_dir = 'cache'
23 def __init__(self, dht):
24 log.msg('Initializing the main apt_dht application')
25 self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
26 if not self.cache_dir.child(download_dir).exists():
27 self.cache_dir.child(download_dir).makedirs()
28 self.db = DB(self.cache_dir.child('apt-dht.db'))
30 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
31 self.dht.join().addCallbacks(self.joinComplete, self.joinError)
32 self.http_server = TopLevel(self.cache_dir.child(download_dir), self)
33 self.setDirectories = self.http_server.setDirectories
34 self.http_site = server.Site(self.http_server)
35 self.peers = PeerManager()
36 self.mirrors = MirrorManager(self.cache_dir)
37 other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
38 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
44 def joinComplete(self, result):
45 self.my_addr = findMyIPAddr(result,
46 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
47 config.getboolean('DEFAULT', 'LOCAL_OK'))
49 raise RuntimeError, "IP address for this machine could not be found"
50 self.cache.scanDirectories()
52 def joinError(self, failure):
53 log.msg("joining DHT failed miserably")
55 raise RuntimeError, "IP address for this machine could not be found"
57 def check_freshness(self, req, path, modtime, resp):
58 log.msg('Checking if %s is still fresh' % path)
59 d = self.peers.get([path], "HEAD", modtime)
60 d.addCallback(self.check_freshness_done, req, path, resp)
63 def check_freshness_done(self, resp, req, path, orig_resp):
65 log.msg('Still fresh, returning: %s' % path)
68 log.msg('Stale, need to redownload: %s' % path)
69 return self.get_resp(req, path)
71 def get_resp(self, req, path):
74 log.msg('Trying to find hash for %s' % path)
75 findDefer = self.mirrors.findHash(path)
77 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
78 callbackArgs=(req, path, d), errbackArgs=(req, path, d))
79 findDefer.addErrback(log.err)
82 def findHash_error(self, failure, req, path, d):
84 self.findHash_done(HashObject(), req, path, d)
86 def findHash_done(self, hash, req, path, d):
87 if hash.expected() is None:
88 log.msg('Hash for %s was not found' % path)
89 self.lookupHash_done([], hash, path, d)
91 log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
93 # Lookup hash in cache
94 locations = self.db.lookupHash(hash.expected())
95 self.getCachedFile(hash, req, path, d, locations)
97 def getCachedFile(self, hash, req, path, d, locations):
99 log.msg('Failed to return file from cache: %s' % path)
100 self.lookupHash(hash, path, d)
103 # Get the first possible location from the list
104 file = locations.pop(0)['path']
105 log.msg('Returning cached file: %s' % file.path)
108 resp = static.File(file.path).renderHTTP(req)
109 if isinstance(resp, defer.Deferred):
110 resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
112 self._getCachedFile(resp, hash, req, path, d, locations)
114 def _getCachedFile(self, resp, hash, req, path, d, locations):
115 if isinstance(resp, failure.Failure):
116 log.msg('Got error trying to get cached file')
118 # Try the next possible location
119 self.getCachedFile(hash, req, path, d, locations)
122 log.msg('Cached response: %r' % resp)
124 if resp.code >= 200 and resp.code < 400:
127 # Try the next possible location
128 self.getCachedFile(hash, req, path, d, locations)
130 def lookupHash(self, hash, path, d):
131 log.msg('Looking up hash in DHT for file: %s' % path)
132 key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
133 lookupDefer = self.dht.getValue(key)
134 lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
136 def lookupHash_done(self, locations, hash, path, d):
138 log.msg('Peers for %s were not found' % path)
139 getDefer = self.peers.get([path])
140 getDefer.addCallback(self.cache.save_file, hash, path)
141 getDefer.addErrback(self.cache.save_error, path)
142 getDefer.addCallbacks(d.callback, d.errback)
144 log.msg('Found peers for %s: %r' % (path, locations))
145 # Download from the found peers
146 getDefer = self.peers.get(locations)
147 getDefer.addCallback(self.check_response, hash, path)
148 getDefer.addCallback(self.cache.save_file, hash, path)
149 getDefer.addErrback(self.cache.save_error, path)
150 getDefer.addCallbacks(d.callback, d.errback)
152 def check_response(self, response, hash, path):
153 if response.code < 200 or response.code >= 300:
154 log.msg('Download from peers failed, going to direct download: %s' % path)
155 getDefer = self.peers.get([path])
159 def new_cached_file(self, file_path, hash, urlpath, url = None):
160 """Add a newly cached file to the DHT.
162 If the file was downloaded, set url to the path it was downloaded for.
165 self.mirrors.updatedFile(url, file_path)
167 if self.my_addr and hash:
168 site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))
169 full_path = urlunparse(('http', site, urlpath, None, None, None))
170 key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
171 storeDefer = self.dht.storeValue(key, full_path)
172 storeDefer.addCallback(self.store_done, full_path)
176 def store_done(self, result, path):
177 log.msg('Added %s to the DHT: %r' % (path, result))