25db818afa1dd994eea66e6e043aabe94f3e91b9
[quix0rs-apt-p2p.git] / apt_dht / apt_dht.py
1
2 from binascii import b2a_hex
3 from urlparse import urlunparse
4 import os, re
5
6 from twisted.internet import defer
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
10
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
17 from db import DB
18 from util import findMyIPAddr, compact
19
20 download_dir = 'cache'
21
22 class AptDHT:
23     def __init__(self, dht):
24         log.msg('Initializing the main apt_dht application')
25         self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
26         if not self.cache_dir.child(download_dir).exists():
27             self.cache_dir.child(download_dir).makedirs()
28         self.db = DB(self.cache_dir.child('apt-dht.db'))
29         self.dht = dht
30         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
31         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
32         self.http_server = TopLevel(self.cache_dir.child(download_dir), self.db, self)
33         self.getHTTPFactory = self.http_server.getHTTPFactory
34         self.peers = PeerManager()
35         self.mirrors = MirrorManager(self.cache_dir)
36         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
37         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
38         self.my_addr = None
39     
40     def joinComplete(self, result):
41         self.my_addr = findMyIPAddr(result,
42                                     config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
43                                     config.getboolean('DEFAULT', 'LOCAL_OK'))
44         if not self.my_addr:
45             raise RuntimeError, "IP address for this machine could not be found"
46         self.cache.scanDirectories()
47
48     def joinError(self, failure):
49         log.msg("joining DHT failed miserably")
50         log.err(failure)
51         raise RuntimeError, "IP address for this machine could not be found"
52     
53     def check_freshness(self, req, path, modtime, resp):
54         log.msg('Checking if %s is still fresh' % path)
55         d = self.peers.get('', path, method = "HEAD", modtime = modtime)
56         d.addCallback(self.check_freshness_done, req, path, resp)
57         return d
58     
59     def check_freshness_done(self, resp, req, path, orig_resp):
60         if resp.code == 304:
61             log.msg('Still fresh, returning: %s' % path)
62             return orig_resp
63         else:
64             log.msg('Stale, need to redownload: %s' % path)
65             return self.get_resp(req, path)
66     
67     def get_resp(self, req, path):
68         d = defer.Deferred()
69         
70         log.msg('Trying to find hash for %s' % path)
71         findDefer = self.mirrors.findHash(path)
72         
73         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
74                                callbackArgs=(req, path, d), errbackArgs=(req, path, d))
75         findDefer.addErrback(log.err)
76         return d
77     
78     def findHash_error(self, failure, req, path, d):
79         log.err(failure)
80         self.findHash_done(HashObject(), req, path, d)
81         
82     def findHash_done(self, hash, req, path, d):
83         if hash.expected() is None:
84             log.msg('Hash for %s was not found' % path)
85             self.lookupHash_done([], hash, path, d)
86         else:
87             log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
88             
89             # Lookup hash in cache
90             locations = self.db.lookupHash(hash.expected())
91             self.getCachedFile(hash, req, path, d, locations)
92
93     def getCachedFile(self, hash, req, path, d, locations):
94         if not locations:
95             log.msg('Failed to return file from cache: %s' % path)
96             self.lookupHash(hash, path, d)
97             return
98         
99         # Get the first possible location from the list
100         file = locations.pop(0)['path']
101         log.msg('Returning cached file: %s' % file.path)
102         
103         # Get it's response
104         resp = static.File(file.path).renderHTTP(req)
105         if isinstance(resp, defer.Deferred):
106             resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
107         else:
108             self._getCachedFile(resp, hash, req, path, d, locations)
109         
110     def _getCachedFile(self, resp, hash, req, path, d, locations):
111         if isinstance(resp, failure.Failure):
112             log.msg('Got error trying to get cached file')
113             log.err()
114             # Try the next possible location
115             self.getCachedFile(hash, req, path, d, locations)
116             return
117             
118         log.msg('Cached response: %r' % resp)
119         
120         if resp.code >= 200 and resp.code < 400:
121             d.callback(resp)
122         else:
123             # Try the next possible location
124             self.getCachedFile(hash, req, path, d, locations)
125
126     def lookupHash(self, hash, path, d):
127         log.msg('Looking up hash in DHT for file: %s' % path)
128         key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
129         lookupDefer = self.dht.getValue(key)
130         lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
131
132     def lookupHash_done(self, values, hash, path, d):
133         if not values:
134             log.msg('Peers for %s were not found' % path)
135             getDefer = self.peers.get(hash, path)
136             getDefer.addCallback(self.cache.save_file, hash, path)
137             getDefer.addErrback(self.cache.save_error, path)
138             getDefer.addCallbacks(d.callback, d.errback)
139         else:
140             log.msg('Found peers for %s: %r' % (path, values))
141             # Download from the found peers
142             getDefer = self.peers.get(hash, path, values)
143             getDefer.addCallback(self.check_response, hash, path)
144             getDefer.addCallback(self.cache.save_file, hash, path)
145             getDefer.addErrback(self.cache.save_error, path)
146             getDefer.addCallbacks(d.callback, d.errback)
147             
148     def check_response(self, response, hash, path):
149         if response.code < 200 or response.code >= 300:
150             log.msg('Download from peers failed, going to direct download: %s' % path)
151             getDefer = self.peers.get(hash, path)
152             return getDefer
153         return response
154         
155     def new_cached_file(self, file_path, hash, url = None, forceDHT = False):
156         """Add a newly cached file to the DHT.
157         
158         If the file was downloaded, set url to the path it was downloaded for.
159         Don't add a file to the DHT unless a hash was found for it
160         (but do add it anyway if forceDHT is True).
161         """
162         if url:
163             self.mirrors.updatedFile(url, file_path)
164         
165         if self.my_addr and hash and (hash.expected() is not None or forceDHT):
166             contact = compact(self.my_addr, config.getint('DEFAULT', 'PORT'))
167             value = {'c': contact}
168             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
169             storeDefer = self.dht.storeValue(key, value)
170             storeDefer.addCallback(self.store_done, hash)
171             return storeDefer
172         return None
173
174     def store_done(self, result, hash):
175         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
176