]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht/apt_dht.py
Remove an unneeded khashmir file.
[quix0rs-apt-p2p.git] / apt_dht / apt_dht.py
1
2 from binascii import b2a_hex
3 from urlparse import urlunparse
4 import os, re
5
6 from twisted.internet import defer, reactor
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
10
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
17 from db import DB
18 from util import findMyIPAddr, compact
19
20 download_dir = 'cache'
21
22 class AptDHT:
23     def __init__(self, dht):
24         log.msg('Initializing the main apt_dht application')
25         self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
26         if not self.cache_dir.child(download_dir).exists():
27             self.cache_dir.child(download_dir).makedirs()
28         self.db = DB(self.cache_dir.child('apt-dht.db'))
29         self.dht = dht
30         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
31         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
32         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
33         self.getHTTPFactory = self.http_server.getHTTPFactory
34         self.peers = PeerManager()
35         self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
36         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
37         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
38         self.my_contact = None
39     
40     def joinComplete(self, result):
41         my_addr = findMyIPAddr(result,
42                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
43                                config.getboolean('DEFAULT', 'LOCAL_OK'))
44         if not my_addr:
45             raise RuntimeError, "IP address for this machine could not be found"
46         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
47         self.cache.scanDirectories()
48         reactor.callLater(60, self.refreshFiles)
49
50     def joinError(self, failure):
51         log.msg("joining DHT failed miserably")
52         log.err(failure)
53         raise RuntimeError, "IP address for this machine could not be found"
54     
55     def refreshFiles(self):
56         """Refresh any files in the DHT that are about to expire."""
57         expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
58         hashes = self.db.expiredFiles(expireAfter)
59         if len(hashes.keys()) > 0:
60             log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
61         for raw_hash in hashes:
62             self.db.refreshHash(raw_hash)
63             hash = HashObject(raw_hash)
64             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
65             value = {'c': self.my_contact}
66             storeDefer = self.dht.storeValue(key, value)
67             storeDefer.addCallback(self.store_done, hash)
68         reactor.callLater(60, self.refreshFiles)
69
70     def check_freshness(self, req, path, modtime, resp):
71         log.msg('Checking if %s is still fresh' % path)
72         d = self.peers.get('', path, method = "HEAD", modtime = modtime)
73         d.addCallback(self.check_freshness_done, req, path, resp)
74         return d
75     
76     def check_freshness_done(self, resp, req, path, orig_resp):
77         if resp.code == 304:
78             log.msg('Still fresh, returning: %s' % path)
79             return orig_resp
80         else:
81             log.msg('Stale, need to redownload: %s' % path)
82             return self.get_resp(req, path)
83     
84     def get_resp(self, req, path):
85         d = defer.Deferred()
86         
87         log.msg('Trying to find hash for %s' % path)
88         findDefer = self.mirrors.findHash(path)
89         
90         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
91                                callbackArgs=(req, path, d), errbackArgs=(req, path, d))
92         findDefer.addErrback(log.err)
93         return d
94     
95     def findHash_error(self, failure, req, path, d):
96         log.err(failure)
97         self.findHash_done(HashObject(), req, path, d)
98         
99     def findHash_done(self, hash, req, path, d):
100         if hash.expected() is None:
101             log.msg('Hash for %s was not found' % path)
102             self.lookupHash_done([], hash, path, d)
103         else:
104             log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
105             
106             # Lookup hash in cache
107             locations = self.db.lookupHash(hash.expected())
108             self.getCachedFile(hash, req, path, d, locations)
109
110     def getCachedFile(self, hash, req, path, d, locations):
111         if not locations:
112             log.msg('Failed to return file from cache: %s' % path)
113             self.lookupHash(hash, path, d)
114             return
115         
116         # Get the first possible location from the list
117         file = locations.pop(0)['path']
118         log.msg('Returning cached file: %s' % file.path)
119         
120         # Get it's response
121         resp = static.File(file.path).renderHTTP(req)
122         if isinstance(resp, defer.Deferred):
123             resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
124         else:
125             self._getCachedFile(resp, hash, req, path, d, locations)
126         
127     def _getCachedFile(self, resp, hash, req, path, d, locations):
128         if isinstance(resp, failure.Failure):
129             log.msg('Got error trying to get cached file')
130             log.err()
131             # Try the next possible location
132             self.getCachedFile(hash, req, path, d, locations)
133             return
134             
135         log.msg('Cached response: %r' % resp)
136         
137         if resp.code >= 200 and resp.code < 400:
138             d.callback(resp)
139         else:
140             # Try the next possible location
141             self.getCachedFile(hash, req, path, d, locations)
142
143     def lookupHash(self, hash, path, d):
144         log.msg('Looking up hash in DHT for file: %s' % path)
145         key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
146         lookupDefer = self.dht.getValue(key)
147         lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
148
149     def lookupHash_done(self, values, hash, path, d):
150         if not values:
151             log.msg('Peers for %s were not found' % path)
152             getDefer = self.peers.get(hash, path)
153             getDefer.addCallback(self.cache.save_file, hash, path)
154             getDefer.addErrback(self.cache.save_error, path)
155             getDefer.addCallbacks(d.callback, d.errback)
156         else:
157             log.msg('Found peers for %s: %r' % (path, values))
158             # Download from the found peers
159             getDefer = self.peers.get(hash, path, values)
160             getDefer.addCallback(self.check_response, hash, path)
161             getDefer.addCallback(self.cache.save_file, hash, path)
162             getDefer.addErrback(self.cache.save_error, path)
163             getDefer.addCallbacks(d.callback, d.errback)
164             
165     def check_response(self, response, hash, path):
166         if response.code < 200 or response.code >= 300:
167             log.msg('Download from peers failed, going to direct download: %s' % path)
168             getDefer = self.peers.get(hash, path)
169             return getDefer
170         return response
171         
172     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
173         """Add a newly cached file to the DHT.
174         
175         If the file was downloaded, set url to the path it was downloaded for.
176         Don't add a file to the DHT unless a hash was found for it
177         (but do add it anyway if forceDHT is True).
178         """
179         if url:
180             self.mirrors.updatedFile(url, file_path)
181         
182         if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
183             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
184             value = {'c': self.my_contact}
185             storeDefer = self.dht.storeValue(key, value)
186             storeDefer.addCallback(self.store_done, hash)
187             return storeDefer
188         return None
189
190     def store_done(self, result, hash):
191         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
192