2 from binascii import b2a_hex
3 from urlparse import urlunparse
6 from twisted.internet import defer
7 from twisted.web2 import server, http, http_headers
8 from twisted.python import log
9 from twisted.python.filepath import FilePath
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
18 from util import findMyIPAddr
20 download_dir = 'cache'
23 def __init__(self, dht):
24 log.msg('Initializing the main apt_dht application')
25 self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
26 if not self.cache_dir.child(download_dir).exists():
27 self.cache_dir.child(download_dir).makedirs()
28 self.db = DB(self.cache_dir.child('apt-dht.db'))
30 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
31 self.dht.join().addCallbacks(self.joinComplete, self.joinError)
32 self.http_server = TopLevel(self.cache_dir.child(download_dir), self)
33 self.setDirectories = self.http_server.setDirectories
34 self.http_site = server.Site(self.http_server)
35 self.peers = PeerManager()
36 self.mirrors = MirrorManager(self.cache_dir)
37 other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
38 self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
44 def joinComplete(self, result):
45 self.my_addr = findMyIPAddr(result, config.getint(config.get('DEFAULT', 'DHT'), 'PORT'))
47 raise RuntimeError, "IP address for this machine could not be found"
48 self.cache.scanDirectories()
50 def joinError(self, failure):
51 log.msg("joining DHT failed miserably")
53 raise RuntimeError, "IP address for this machine could not be found"
55 def check_freshness(self, path, modtime, resp):
56 log.msg('Checking if %s is still fresh' % path)
57 d = self.peers.get([path], "HEAD", modtime)
58 d.addCallback(self.check_freshness_done, path, resp)
61 def check_freshness_done(self, resp, path, orig_resp):
63 log.msg('Still fresh, returning: %s' % path)
66 log.msg('Stale, need to redownload: %s' % path)
67 return self.get_resp(path)
69 def get_resp(self, path):
72 log.msg('Trying to find hash for %s' % path)
73 findDefer = self.mirrors.findHash(path)
75 findDefer.addCallbacks(self.findHash_done, self.findHash_error,
76 callbackArgs=(path, d), errbackArgs=(path, d))
77 findDefer.addErrback(log.err)
80 def findHash_error(self, failure, path, d):
82 self.findHash_done(HashObject(), path, d)
84 def findHash_done(self, hash, path, d):
85 if hash.expected() is None:
86 log.msg('Hash for %s was not found' % path)
87 self.lookupHash_done([], hash, path, d)
89 log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
90 # Lookup hash from DHT
91 key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
92 lookupDefer = self.dht.getValue(key)
93 lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
95 def lookupHash_done(self, locations, hash, path, d):
97 log.msg('Peers for %s were not found' % path)
98 getDefer = self.peers.get([path])
99 getDefer.addCallback(self.cache.save_file, hash, path)
100 getDefer.addErrback(self.cache.save_error, path)
101 getDefer.addCallbacks(d.callback, d.errback)
103 log.msg('Found peers for %s: %r' % (path, locations))
104 # Download from the found peers
105 getDefer = self.peers.get(locations)
106 getDefer.addCallback(self.check_response, hash, path)
107 getDefer.addCallback(self.cache.save_file, hash, path)
108 getDefer.addErrback(self.cache.save_error, path)
109 getDefer.addCallbacks(d.callback, d.errback)
111 def check_response(self, response, hash, path):
112 if response.code < 200 or response.code >= 300:
113 log.msg('Download from peers failed, going to direct download: %s' % path)
114 getDefer = self.peers.get([path])
118 def new_cached_file(self, file_path, hash, urlpath, url = None):
119 """Add a newly cached file to the DHT.
121 If the file was downloaded, set url to the path it was downloaded for.
124 self.mirrors.updatedFile(url, file_path)
126 if self.my_addr and hash:
127 site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))
128 full_path = urlunparse(('http', site, urlpath, None, None, None))
129 key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
130 storeDefer = self.dht.storeValue(key, full_path)
131 storeDefer.addCallback(self.store_done, full_path)
132 storeDefer.addErrback(log.err)
134 def store_done(self, result, path):
135 log.msg('Added %s to the DHT: %r' % (path, result))