Add appropriate piece strings to the DHT.
[quix0rs-apt-p2p.git] / apt_dht / apt_dht.py
1
2 from binascii import b2a_hex
3 from urlparse import urlunparse
4 import os, re, sha
5
6 from twisted.internet import defer, reactor
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
10
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
17 from db import DB
18 from util import findMyIPAddr, compact
19
20 DHT_PIECES = 4
21 TORRENT_PIECES = 70
22
23 download_dir = 'cache'
24
25 class AptDHT:
26     def __init__(self, dht):
27         log.msg('Initializing the main apt_dht application')
28         self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
29         if not self.cache_dir.child(download_dir).exists():
30             self.cache_dir.child(download_dir).makedirs()
31         self.db = DB(self.cache_dir.child('apt-dht.db'))
32         self.dht = dht
33         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
34         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
35         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
36         self.getHTTPFactory = self.http_server.getHTTPFactory
37         self.peers = PeerManager()
38         self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
39         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
40         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
41         self.my_contact = None
42     
43     def joinComplete(self, result):
44         my_addr = findMyIPAddr(result,
45                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
46                                config.getboolean('DEFAULT', 'LOCAL_OK'))
47         if not my_addr:
48             raise RuntimeError, "IP address for this machine could not be found"
49         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
50         self.cache.scanDirectories()
51         reactor.callLater(60, self.refreshFiles)
52
53     def joinError(self, failure):
54         log.msg("joining DHT failed miserably")
55         log.err(failure)
56         raise RuntimeError, "IP address for this machine could not be found"
57     
58     def refreshFiles(self):
59         """Refresh any files in the DHT that are about to expire."""
60         expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
61         hashes = self.db.expiredFiles(expireAfter)
62         if len(hashes.keys()) > 0:
63             log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
64         for raw_hash in hashes:
65             self.db.refreshHash(raw_hash)
66             hash = HashObject(raw_hash)
67             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
68             value = {'c': self.my_contact}
69             storeDefer = self.dht.storeValue(key, value)
70             storeDefer.addCallback(self.store_done, hash)
71         reactor.callLater(60, self.refreshFiles)
72
73     def check_freshness(self, req, path, modtime, resp):
74         log.msg('Checking if %s is still fresh' % path)
75         d = self.peers.get('', path, method = "HEAD", modtime = modtime)
76         d.addCallback(self.check_freshness_done, req, path, resp)
77         return d
78     
79     def check_freshness_done(self, resp, req, path, orig_resp):
80         if resp.code == 304:
81             log.msg('Still fresh, returning: %s' % path)
82             return orig_resp
83         else:
84             log.msg('Stale, need to redownload: %s' % path)
85             return self.get_resp(req, path)
86     
87     def get_resp(self, req, path):
88         d = defer.Deferred()
89         
90         log.msg('Trying to find hash for %s' % path)
91         findDefer = self.mirrors.findHash(path)
92         
93         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
94                                callbackArgs=(req, path, d), errbackArgs=(req, path, d))
95         findDefer.addErrback(log.err)
96         return d
97     
98     def findHash_error(self, failure, req, path, d):
99         log.err(failure)
100         self.findHash_done(HashObject(), req, path, d)
101         
102     def findHash_done(self, hash, req, path, d):
103         if hash.expected() is None:
104             log.msg('Hash for %s was not found' % path)
105             self.lookupHash_done([], hash, path, d)
106         else:
107             log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
108             
109             # Lookup hash in cache
110             locations = self.db.lookupHash(hash.expected())
111             self.getCachedFile(hash, req, path, d, locations)
112
113     def getCachedFile(self, hash, req, path, d, locations):
114         if not locations:
115             log.msg('Failed to return file from cache: %s' % path)
116             self.lookupHash(hash, path, d)
117             return
118         
119         # Get the first possible location from the list
120         file = locations.pop(0)['path']
121         log.msg('Returning cached file: %s' % file.path)
122         
123         # Get it's response
124         resp = static.File(file.path).renderHTTP(req)
125         if isinstance(resp, defer.Deferred):
126             resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
127         else:
128             self._getCachedFile(resp, hash, req, path, d, locations)
129         
130     def _getCachedFile(self, resp, hash, req, path, d, locations):
131         if isinstance(resp, failure.Failure):
132             log.msg('Got error trying to get cached file')
133             log.err()
134             # Try the next possible location
135             self.getCachedFile(hash, req, path, d, locations)
136             return
137             
138         log.msg('Cached response: %r' % resp)
139         
140         if resp.code >= 200 and resp.code < 400:
141             d.callback(resp)
142         else:
143             # Try the next possible location
144             self.getCachedFile(hash, req, path, d, locations)
145
146     def lookupHash(self, hash, path, d):
147         log.msg('Looking up hash in DHT for file: %s' % path)
148         key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
149         lookupDefer = self.dht.getValue(key)
150         lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
151
152     def lookupHash_done(self, values, hash, path, d):
153         if not values:
154             log.msg('Peers for %s were not found' % path)
155             getDefer = self.peers.get(hash, path)
156             getDefer.addCallback(self.cache.save_file, hash, path)
157             getDefer.addErrback(self.cache.save_error, path)
158             getDefer.addCallbacks(d.callback, d.errback)
159         else:
160             log.msg('Found peers for %s: %r' % (path, values))
161             # Download from the found peers
162             getDefer = self.peers.get(hash, path, values)
163             getDefer.addCallback(self.check_response, hash, path)
164             getDefer.addCallback(self.cache.save_file, hash, path)
165             getDefer.addErrback(self.cache.save_error, path)
166             getDefer.addCallbacks(d.callback, d.errback)
167             
168     def check_response(self, response, hash, path):
169         if response.code < 200 or response.code >= 300:
170             log.msg('Download from peers failed, going to direct download: %s' % path)
171             getDefer = self.peers.get(hash, path)
172             return getDefer
173         return response
174         
175     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
176         """Add a newly cached file to the DHT.
177         
178         If the file was downloaded, set url to the path it was downloaded for.
179         Don't add a file to the DHT unless a hash was found for it
180         (but do add it anyway if forceDHT is True).
181         """
182         if url:
183             self.mirrors.updatedFile(url, file_path)
184         
185         if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
186             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
187             value = {'c': self.my_contact}
188             pieces = hash.pieceDigests()
189             if len(pieces) <= 1:
190                 pass
191             elif len(pieces) <= DHT_PIECES:
192                 value['t'] = {'t': ''.join(pieces)}
193             elif len(pieces) <= TORRENT_PIECES:
194                 s = sha.new().update(''.join(pieces))
195                 value['h'] = s.digest()
196             else:
197                 s = sha.new().update(''.join(pieces))
198                 value['l'] = s.digest()
199             storeDefer = self.dht.storeValue(key, value)
200             storeDefer.addCallback(self.store_done, hash)
201             return storeDefer
202         return None
203
204     def store_done(self, result, hash):
205         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
206         pieces = hash.pieceDigests()
207         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
208             s = sha.new().update(''.join(pieces))
209             key = s.digest()
210             value = {'t': ''.join(pieces)}
211             storeDefer = self.dht.storeValue(key, value)
212             storeDefer.addCallback(self.store_torrent_done, key)
213             return storeDefer
214         return result
215
216     def store_torrent_done(self, result, key):
217         log.msg('Added torrent string %s to the DHT: %r' % (b2ahex(key), result))
218         return result
219