2 from binascii import b2a_hex
3 from urlparse import urlunparse
6 from twisted.internet import defer
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
18 from util import findMyIPAddr
20 download_dir = 'cache'
23 def __init__(self, dht):
24 log.msg('Initializing the main apt_dht application')
25 self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
26 if not self.cache_dir.child(download_dir).exists():
27 self.cache_dir.child(download_dir).makedirs()
28 self.db = DB(self.cache_dir.child('apt-dht.db'))
30 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
31 self.dht.join().addCallbacks(self.joinComplete, self.joinError)
32 self.http_server = TopLevel(self.cache_dir.child(download_dir), self)
33 self.setDirectories = self.http_server.setDirectories
34 self.getHTTPFactory = self.http_server.getHTTPFactory
35 self.peers = PeerManager()
36 self.mirrors = MirrorManager(self.cache_dir)
37 other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
38 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
41 def joinComplete(self, result):
42 self.my_addr = findMyIPAddr(result,
43 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
44 config.getboolean('DEFAULT', 'LOCAL_OK'))
46 raise RuntimeError, "IP address for this machine could not be found"
47 self.cache.scanDirectories()
49 def joinError(self, failure):
50 log.msg("joining DHT failed miserably")
52 raise RuntimeError, "IP address for this machine could not be found"
54 def check_freshness(self, req, path, modtime, resp):
55 log.msg('Checking if %s is still fresh' % path)
56 d = self.peers.get([path], "HEAD", modtime)
57 d.addCallback(self.check_freshness_done, req, path, resp)
60 def check_freshness_done(self, resp, req, path, orig_resp):
62 log.msg('Still fresh, returning: %s' % path)
65 log.msg('Stale, need to redownload: %s' % path)
66 return self.get_resp(req, path)
68 def get_resp(self, req, path):
71 log.msg('Trying to find hash for %s' % path)
72 findDefer = self.mirrors.findHash(path)
74 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
75 callbackArgs=(req, path, d), errbackArgs=(req, path, d))
76 findDefer.addErrback(log.err)
79 def findHash_error(self, failure, req, path, d):
81 self.findHash_done(HashObject(), req, path, d)
83 def findHash_done(self, hash, req, path, d):
84 if hash.expected() is None:
85 log.msg('Hash for %s was not found' % path)
86 self.lookupHash_done([], hash, path, d)
88 log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
90 # Lookup hash in cache
91 locations = self.db.lookupHash(hash.expected())
92 self.getCachedFile(hash, req, path, d, locations)
94 def getCachedFile(self, hash, req, path, d, locations):
96 log.msg('Failed to return file from cache: %s' % path)
97 self.lookupHash(hash, path, d)
100 # Get the first possible location from the list
101 file = locations.pop(0)['path']
102 log.msg('Returning cached file: %s' % file.path)
105 resp = static.File(file.path).renderHTTP(req)
106 if isinstance(resp, defer.Deferred):
107 resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
109 self._getCachedFile(resp, hash, req, path, d, locations)
111 def _getCachedFile(self, resp, hash, req, path, d, locations):
112 if isinstance(resp, failure.Failure):
113 log.msg('Got error trying to get cached file')
115 # Try the next possible location
116 self.getCachedFile(hash, req, path, d, locations)
119 log.msg('Cached response: %r' % resp)
121 if resp.code >= 200 and resp.code < 400:
124 # Try the next possible location
125 self.getCachedFile(hash, req, path, d, locations)
127 def lookupHash(self, hash, path, d):
128 log.msg('Looking up hash in DHT for file: %s' % path)
129 key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
130 lookupDefer = self.dht.getValue(key)
131 lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
133 def lookupHash_done(self, locations, hash, path, d):
135 log.msg('Peers for %s were not found' % path)
136 getDefer = self.peers.get([path])
137 getDefer.addCallback(self.cache.save_file, hash, path)
138 getDefer.addErrback(self.cache.save_error, path)
139 getDefer.addCallbacks(d.callback, d.errback)
141 log.msg('Found peers for %s: %r' % (path, locations))
142 # Download from the found peers
143 getDefer = self.peers.get(locations)
144 getDefer.addCallback(self.check_response, hash, path)
145 getDefer.addCallback(self.cache.save_file, hash, path)
146 getDefer.addErrback(self.cache.save_error, path)
147 getDefer.addCallbacks(d.callback, d.errback)
149 def check_response(self, response, hash, path):
150 if response.code < 200 or response.code >= 300:
151 log.msg('Download from peers failed, going to direct download: %s' % path)
152 getDefer = self.peers.get([path])
156 def new_cached_file(self, file_path, hash, urlpath, url = None):
157 """Add a newly cached file to the DHT.
159 If the file was downloaded, set url to the path it was downloaded for.
162 self.mirrors.updatedFile(url, file_path)
164 if self.my_addr and hash:
165 site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))
166 full_path = urlunparse(('http', site, urlpath, None, None, None))
167 key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
168 storeDefer = self.dht.storeValue(key, full_path)
169 storeDefer.addCallback(self.store_done, full_path)
173 def store_done(self, result, path):
174 log.msg('Added %s to the DHT: %r' % (path, result))