047d1c2b815dbf7c3d3643244c5f04fa9b8d0fad
[quix0rs-apt-p2p.git] / apt_dht / apt_dht.py
1
2 from binascii import b2a_hex
3 from urlparse import urlunparse
4 import os, re
5
6 from twisted.internet import defer
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
10
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
17 from db import DB
18 from util import findMyIPAddr
19
20 download_dir = 'cache'
21
22 class AptDHT:
23     def __init__(self, dht):
24         log.msg('Initializing the main apt_dht application')
25         self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
26         if not self.cache_dir.child(download_dir).exists():
27             self.cache_dir.child(download_dir).makedirs()
28         self.db = DB(self.cache_dir.child('apt-dht.db'))
29         self.dht = dht
30         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
31         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
32         self.http_server = TopLevel(self.cache_dir.child(download_dir), self)
33         self.setDirectories = self.http_server.setDirectories
34         self.http_site = server.Site(self.http_server)
35         self.peers = PeerManager()
36         self.mirrors = MirrorManager(self.cache_dir)
37         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
38         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
39         self.my_addr = None
40     
41     def getSite(self):
42         return self.http_site
43     
44     def joinComplete(self, result):
45         self.my_addr = findMyIPAddr(result, config.getint(config.get('DEFAULT', 'DHT'), 'PORT'))
46         if not self.my_addr:
47             raise RuntimeError, "IP address for this machine could not be found"
48         self.cache.scanDirectories()
49
50     def joinError(self, failure):
51         log.msg("joining DHT failed miserably")
52         log.err(failure)
53         raise RuntimeError, "IP address for this machine could not be found"
54     
55     def check_freshness(self, req, path, modtime, resp):
56         log.msg('Checking if %s is still fresh' % path)
57         d = self.peers.get([path], "HEAD", modtime)
58         d.addCallback(self.check_freshness_done, req, path, resp)
59         return d
60     
61     def check_freshness_done(self, resp, req, path, orig_resp):
62         if resp.code == 304:
63             log.msg('Still fresh, returning: %s' % path)
64             return orig_resp
65         else:
66             log.msg('Stale, need to redownload: %s' % path)
67             return self.get_resp(req, path)
68     
69     def get_resp(self, req, path):
70         d = defer.Deferred()
71         
72         log.msg('Trying to find hash for %s' % path)
73         findDefer = self.mirrors.findHash(path)
74         
75         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
76                                callbackArgs=(req, path, d), errbackArgs=(req, path, d))
77         findDefer.addErrback(log.err)
78         return d
79     
80     def findHash_error(self, failure, req, path, d):
81         log.err(failure)
82         self.findHash_done(HashObject(), req, path, d)
83         
84     def findHash_done(self, hash, req, path, d):
85         if hash.expected() is None:
86             log.msg('Hash for %s was not found' % path)
87             self.lookupHash_done([], hash, path, d)
88         else:
89             log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
90             
91             # Lookup hash in cache
92             locations = self.db.lookupHash(hash.expected())
93             self.getCachedFile(hash, req, path, d, locations)
94
95     def getCachedFile(self, hash, req, path, d, locations):
96         if not locations:
97             log.msg('Failed to return file from cache: %s' % path)
98             self.lookupHash(hash, path, d)
99             return
100         
101         # Get the first possible location from the list
102         file = locations.pop(0)['path']
103         log.msg('Returning cached file: %s' % file.path)
104         
105         # Get it's response
106         resp = static.File(file.path).renderHTTP(req)
107         if isinstance(resp, defer.Deferred):
108             resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
109         else:
110             self._getCachedFile(resp, hash, req, path, d, locations)
111         
112     def _getCachedFile(self, resp, hash, req, path, d, locations):
113         if isinstance(resp, failure.Failure):
114             log.msg('Got error trying to get cached file')
115             log.err()
116             # Try the next possible location
117             self.getCachedFile(hash, req, path, d, locations)
118             return
119             
120         log.msg('Cached response: %r' % resp)
121         
122         if resp.code >= 200 and resp.code < 400:
123             d.callback(resp)
124         else:
125             # Try the next possible location
126             self.getCachedFile(hash, req, path, d, locations)
127
128     def lookupHash(self, hash, path, d):
129         log.msg('Looking up hash in DHT for file: %s' % path)
130         key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
131         lookupDefer = self.dht.getValue(key)
132         lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
133
134     def lookupHash_done(self, locations, hash, path, d):
135         if not locations:
136             log.msg('Peers for %s were not found' % path)
137             getDefer = self.peers.get([path])
138             getDefer.addCallback(self.cache.save_file, hash, path)
139             getDefer.addErrback(self.cache.save_error, path)
140             getDefer.addCallbacks(d.callback, d.errback)
141         else:
142             log.msg('Found peers for %s: %r' % (path, locations))
143             # Download from the found peers
144             getDefer = self.peers.get(locations)
145             getDefer.addCallback(self.check_response, hash, path)
146             getDefer.addCallback(self.cache.save_file, hash, path)
147             getDefer.addErrback(self.cache.save_error, path)
148             getDefer.addCallbacks(d.callback, d.errback)
149             
150     def check_response(self, response, hash, path):
151         if response.code < 200 or response.code >= 300:
152             log.msg('Download from peers failed, going to direct download: %s' % path)
153             getDefer = self.peers.get([path])
154             return getDefer
155         return response
156         
157     def new_cached_file(self, file_path, hash, urlpath, url = None):
158         """Add a newly cached file to the DHT.
159         
160         If the file was downloaded, set url to the path it was downloaded for.
161         """
162         if url:
163             self.mirrors.updatedFile(url, file_path)
164         
165         if self.my_addr and hash:
166             site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))
167             full_path = urlunparse(('http', site, urlpath, None, None, None))
168             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
169             storeDefer = self.dht.storeValue(key, full_path)
170             storeDefer.addCallback(self.store_done, full_path)
171             return storeDefer
172         return None
173
174     def store_done(self, result, path):
175         log.msg('Added %s to the DHT: %r' % (path, result))
176