749396b3a36f9b3922c79af4c30bc487f5572594
[quix0rs-apt-p2p.git] / apt_dht / apt_dht.py
1
2 from binascii import b2a_hex
3 from urlparse import urlunparse
4 import os, re
5
6 from twisted.internet import defer
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
10
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
17 from db import DB
18 from util import findMyIPAddr
19
20 download_dir = 'cache'
21
22 class AptDHT:
23     def __init__(self, dht):
24         log.msg('Initializing the main apt_dht application')
25         self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
26         if not self.cache_dir.child(download_dir).exists():
27             self.cache_dir.child(download_dir).makedirs()
28         self.db = DB(self.cache_dir.child('apt-dht.db'))
29         self.dht = dht
30         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
31         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
32         self.http_server = TopLevel(self.cache_dir.child(download_dir), self)
33         self.setDirectories = self.http_server.setDirectories
34         self.http_site = server.Site(self.http_server)
35         self.peers = PeerManager()
36         self.mirrors = MirrorManager(self.cache_dir)
37         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
38         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
39         self.my_addr = None
40     
41     def getSite(self):
42         return self.http_site
43     
44     def joinComplete(self, result):
45         self.my_addr = findMyIPAddr(result,
46                                     config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
47                                     config.getboolean('DEFAULT', 'LOCAL_OK'))
48         if not self.my_addr:
49             raise RuntimeError, "IP address for this machine could not be found"
50         self.cache.scanDirectories()
51
52     def joinError(self, failure):
53         log.msg("joining DHT failed miserably")
54         log.err(failure)
55         raise RuntimeError, "IP address for this machine could not be found"
56     
57     def check_freshness(self, req, path, modtime, resp):
58         log.msg('Checking if %s is still fresh' % path)
59         d = self.peers.get([path], "HEAD", modtime)
60         d.addCallback(self.check_freshness_done, req, path, resp)
61         return d
62     
63     def check_freshness_done(self, resp, req, path, orig_resp):
64         if resp.code == 304:
65             log.msg('Still fresh, returning: %s' % path)
66             return orig_resp
67         else:
68             log.msg('Stale, need to redownload: %s' % path)
69             return self.get_resp(req, path)
70     
71     def get_resp(self, req, path):
72         d = defer.Deferred()
73         
74         log.msg('Trying to find hash for %s' % path)
75         findDefer = self.mirrors.findHash(path)
76         
77         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
78                                callbackArgs=(req, path, d), errbackArgs=(req, path, d))
79         findDefer.addErrback(log.err)
80         return d
81     
82     def findHash_error(self, failure, req, path, d):
83         log.err(failure)
84         self.findHash_done(HashObject(), req, path, d)
85         
86     def findHash_done(self, hash, req, path, d):
87         if hash.expected() is None:
88             log.msg('Hash for %s was not found' % path)
89             self.lookupHash_done([], hash, path, d)
90         else:
91             log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
92             
93             # Lookup hash in cache
94             locations = self.db.lookupHash(hash.expected())
95             self.getCachedFile(hash, req, path, d, locations)
96
97     def getCachedFile(self, hash, req, path, d, locations):
98         if not locations:
99             log.msg('Failed to return file from cache: %s' % path)
100             self.lookupHash(hash, path, d)
101             return
102         
103         # Get the first possible location from the list
104         file = locations.pop(0)['path']
105         log.msg('Returning cached file: %s' % file.path)
106         
107         # Get it's response
108         resp = static.File(file.path).renderHTTP(req)
109         if isinstance(resp, defer.Deferred):
110             resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
111         else:
112             self._getCachedFile(resp, hash, req, path, d, locations)
113         
114     def _getCachedFile(self, resp, hash, req, path, d, locations):
115         if isinstance(resp, failure.Failure):
116             log.msg('Got error trying to get cached file')
117             log.err()
118             # Try the next possible location
119             self.getCachedFile(hash, req, path, d, locations)
120             return
121             
122         log.msg('Cached response: %r' % resp)
123         
124         if resp.code >= 200 and resp.code < 400:
125             d.callback(resp)
126         else:
127             # Try the next possible location
128             self.getCachedFile(hash, req, path, d, locations)
129
130     def lookupHash(self, hash, path, d):
131         log.msg('Looking up hash in DHT for file: %s' % path)
132         key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
133         lookupDefer = self.dht.getValue(key)
134         lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
135
136     def lookupHash_done(self, locations, hash, path, d):
137         if not locations:
138             log.msg('Peers for %s were not found' % path)
139             getDefer = self.peers.get([path])
140             getDefer.addCallback(self.cache.save_file, hash, path)
141             getDefer.addErrback(self.cache.save_error, path)
142             getDefer.addCallbacks(d.callback, d.errback)
143         else:
144             log.msg('Found peers for %s: %r' % (path, locations))
145             # Download from the found peers
146             getDefer = self.peers.get(locations)
147             getDefer.addCallback(self.check_response, hash, path)
148             getDefer.addCallback(self.cache.save_file, hash, path)
149             getDefer.addErrback(self.cache.save_error, path)
150             getDefer.addCallbacks(d.callback, d.errback)
151             
152     def check_response(self, response, hash, path):
153         if response.code < 200 or response.code >= 300:
154             log.msg('Download from peers failed, going to direct download: %s' % path)
155             getDefer = self.peers.get([path])
156             return getDefer
157         return response
158         
159     def new_cached_file(self, file_path, hash, urlpath, url = None):
160         """Add a newly cached file to the DHT.
161         
162         If the file was downloaded, set url to the path it was downloaded for.
163         """
164         if url:
165             self.mirrors.updatedFile(url, file_path)
166         
167         if self.my_addr and hash:
168             site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))
169             full_path = urlunparse(('http', site, urlpath, None, None, None))
170             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
171             storeDefer = self.dht.storeValue(key, full_path)
172             storeDefer.addCallback(self.store_done, full_path)
173             return storeDefer
174         return None
175
176     def store_done(self, result, path):
177         log.msg('Added %s to the DHT: %r' % (path, result))
178