Store piece hashes in the DB.
[quix0rs-apt-p2p.git] / apt_dht / apt_dht.py
1
2 from binascii import b2a_hex
3 from urlparse import urlunparse
4 import os, re, sha
5
6 from twisted.internet import defer, reactor
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
10
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
17 from db import DB
18 from util import findMyIPAddr, compact
19
20 DHT_PIECES = 4
21 TORRENT_PIECES = 70
22
23 download_dir = 'cache'
24
25 class AptDHT:
26     def __init__(self, dht):
27         log.msg('Initializing the main apt_dht application')
28         self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
29         if not self.cache_dir.child(download_dir).exists():
30             self.cache_dir.child(download_dir).makedirs()
31         self.db = DB(self.cache_dir.child('apt-dht.db'))
32         self.dht = dht
33         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
34         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
35         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
36         self.getHTTPFactory = self.http_server.getHTTPFactory
37         self.peers = PeerManager()
38         self.mirrors = MirrorManager(self.cache_dir, config.gettime('DEFAULT', 'UNLOAD_PACKAGES_CACHE'))
39         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
40         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
41         self.my_contact = None
42     
43     def joinComplete(self, result):
44         my_addr = findMyIPAddr(result,
45                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
46                                config.getboolean('DEFAULT', 'LOCAL_OK'))
47         if not my_addr:
48             raise RuntimeError, "IP address for this machine could not be found"
49         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
50         self.cache.scanDirectories()
51         reactor.callLater(60, self.refreshFiles)
52
53     def joinError(self, failure):
54         log.msg("joining DHT failed miserably")
55         log.err(failure)
56         raise RuntimeError, "IP address for this machine could not be found"
57     
58     def refreshFiles(self):
59         """Refresh any files in the DHT that are about to expire."""
60         expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
61         hashes = self.db.expiredHashes(expireAfter)
62         if len(hashes.keys()) > 0:
63             log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
64         self._refreshFiles(None, hashes)
65         
66     def _refreshFiles(self, result, hashes):
67         if result is not None:
68             log.msg('Storage resulted in: %r' % result)
69
70         if hashes:
71             raw_hash = hashes.keys()[0]
72             self.db.refreshHash(raw_hash)
73             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
74             del hashes[raw_hash]
75             storeDefer = self.store(hash)
76             storeDefer.addBoth(self._refreshFiles, hashes)
77         else:
78             reactor.callLater(60, self.refreshFiles)
79
80     def check_freshness(self, req, path, modtime, resp):
81         log.msg('Checking if %s is still fresh' % path)
82         d = self.peers.get('', path, method = "HEAD", modtime = modtime)
83         d.addCallback(self.check_freshness_done, req, path, resp)
84         return d
85     
86     def check_freshness_done(self, resp, req, path, orig_resp):
87         if resp.code == 304:
88             log.msg('Still fresh, returning: %s' % path)
89             return orig_resp
90         else:
91             log.msg('Stale, need to redownload: %s' % path)
92             return self.get_resp(req, path)
93     
94     def get_resp(self, req, path):
95         d = defer.Deferred()
96         
97         log.msg('Trying to find hash for %s' % path)
98         findDefer = self.mirrors.findHash(path)
99         
100         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
101                                callbackArgs=(req, path, d), errbackArgs=(req, path, d))
102         findDefer.addErrback(log.err)
103         return d
104     
105     def findHash_error(self, failure, req, path, d):
106         log.err(failure)
107         self.findHash_done(HashObject(), req, path, d)
108         
109     def findHash_done(self, hash, req, path, d):
110         if hash.expected() is None:
111             log.msg('Hash for %s was not found' % path)
112             self.lookupHash_done([], hash, path, d)
113         else:
114             log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
115             
116             # Lookup hash in cache
117             locations = self.db.lookupHash(hash.expected(), filesOnly = True)
118             self.getCachedFile(hash, req, path, d, locations)
119
120     def getCachedFile(self, hash, req, path, d, locations):
121         if not locations:
122             log.msg('Failed to return file from cache: %s' % path)
123             self.lookupHash(hash, path, d)
124             return
125         
126         # Get the first possible location from the list
127         file = locations.pop(0)['path']
128         log.msg('Returning cached file: %s' % file.path)
129         
130         # Get it's response
131         resp = static.File(file.path).renderHTTP(req)
132         if isinstance(resp, defer.Deferred):
133             resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
134         else:
135             self._getCachedFile(resp, hash, req, path, d, locations)
136         
137     def _getCachedFile(self, resp, hash, req, path, d, locations):
138         if isinstance(resp, failure.Failure):
139             log.msg('Got error trying to get cached file')
140             log.err()
141             # Try the next possible location
142             self.getCachedFile(hash, req, path, d, locations)
143             return
144             
145         log.msg('Cached response: %r' % resp)
146         
147         if resp.code >= 200 and resp.code < 400:
148             d.callback(resp)
149         else:
150             # Try the next possible location
151             self.getCachedFile(hash, req, path, d, locations)
152
153     def lookupHash(self, hash, path, d):
154         log.msg('Looking up hash in DHT for file: %s' % path)
155         key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
156         lookupDefer = self.dht.getValue(key)
157         lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
158
159     def lookupHash_done(self, values, hash, path, d):
160         if not values:
161             log.msg('Peers for %s were not found' % path)
162             getDefer = self.peers.get(hash, path)
163             getDefer.addCallback(self.cache.save_file, hash, path)
164             getDefer.addErrback(self.cache.save_error, path)
165             getDefer.addCallbacks(d.callback, d.errback)
166         else:
167             log.msg('Found peers for %s: %r' % (path, values))
168             # Download from the found peers
169             getDefer = self.peers.get(hash, path, values)
170             getDefer.addCallback(self.check_response, hash, path)
171             getDefer.addCallback(self.cache.save_file, hash, path)
172             getDefer.addErrback(self.cache.save_error, path)
173             getDefer.addCallbacks(d.callback, d.errback)
174             
175     def check_response(self, response, hash, path):
176         if response.code < 200 or response.code >= 300:
177             log.msg('Download from peers failed, going to direct download: %s' % path)
178             getDefer = self.peers.get(hash, path)
179             return getDefer
180         return response
181         
182     def new_cached_file(self, file_path, hash, new_hash, url = None, forceDHT = False):
183         """Add a newly cached file to the appropriate places.
184         
185         If the file was downloaded, set url to the path it was downloaded for.
186         Doesn't add a file to the DHT unless a hash was found for it
187         (but does add it anyway if forceDHT is True).
188         """
189         if url:
190             self.mirrors.updatedFile(url, file_path)
191         
192         if self.my_contact and hash and new_hash and (hash.expected() is not None or forceDHT):
193             return self.store(hash)
194         return None
195             
196     def store(self, hash):
197         """Add a file to the DHT."""
198         key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
199         value = {'c': self.my_contact}
200         pieces = hash.pieceDigests()
201         if len(pieces) <= 1:
202             pass
203         elif len(pieces) <= DHT_PIECES:
204             value['t'] = {'t': ''.join(pieces)}
205         elif len(pieces) <= TORRENT_PIECES:
206             s = sha.new().update(''.join(pieces))
207             value['h'] = s.digest()
208         else:
209             s = sha.new().update(''.join(pieces))
210             value['l'] = s.digest()
211         storeDefer = self.dht.storeValue(key, value)
212         storeDefer.addCallback(self.store_done, hash)
213         return storeDefer
214
215     def store_done(self, result, hash):
216         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
217         pieces = hash.pieceDigests()
218         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
219             s = sha.new().update(''.join(pieces))
220             key = s.digest()
221             value = {'t': ''.join(pieces)}
222             storeDefer = self.dht.storeValue(key, value)
223             storeDefer.addCallback(self.store_torrent_done, key)
224             return storeDefer
225         return result
226
227     def store_torrent_done(self, result, key):
228         log.msg('Added torrent string %s to the DHT: %r' % (b2ahex(key), result))
229         return result
230