aeda56bf2147930d2a9c03c57f38bd3296f64863
[quix0rs-apt-p2p.git] / apt_dht / apt_dht.py
1
2 from binascii import b2a_hex
3 from urlparse import urlunparse
4 import os, re
5
6 from twisted.internet import defer
7 from twisted.web2 import server, http, http_headers, static
8 from twisted.python import log, failure
9 from twisted.python.filepath import FilePath
10
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
17 from db import DB
18 from util import findMyIPAddr
19
20 download_dir = 'cache'
21
22 class AptDHT:
23     def __init__(self, dht):
24         log.msg('Initializing the main apt_dht application')
25         self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
26         if not self.cache_dir.child(download_dir).exists():
27             self.cache_dir.child(download_dir).makedirs()
28         self.db = DB(self.cache_dir.child('apt-dht.db'))
29         self.dht = dht
30         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
31         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
32         self.http_server = TopLevel(self.cache_dir.child(download_dir), self)
33         self.setDirectories = self.http_server.setDirectories
34         self.getHTTPFactory = self.http_server.getHTTPFactory
35         self.peers = PeerManager()
36         self.mirrors = MirrorManager(self.cache_dir)
37         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
38         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
39         self.my_addr = None
40     
41     def joinComplete(self, result):
42         self.my_addr = findMyIPAddr(result,
43                                     config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
44                                     config.getboolean('DEFAULT', 'LOCAL_OK'))
45         if not self.my_addr:
46             raise RuntimeError, "IP address for this machine could not be found"
47         self.cache.scanDirectories()
48
49     def joinError(self, failure):
50         log.msg("joining DHT failed miserably")
51         log.err(failure)
52         raise RuntimeError, "IP address for this machine could not be found"
53     
54     def check_freshness(self, req, path, modtime, resp):
55         log.msg('Checking if %s is still fresh' % path)
56         d = self.peers.get([path], "HEAD", modtime)
57         d.addCallback(self.check_freshness_done, req, path, resp)
58         return d
59     
60     def check_freshness_done(self, resp, req, path, orig_resp):
61         if resp.code == 304:
62             log.msg('Still fresh, returning: %s' % path)
63             return orig_resp
64         else:
65             log.msg('Stale, need to redownload: %s' % path)
66             return self.get_resp(req, path)
67     
68     def get_resp(self, req, path):
69         d = defer.Deferred()
70         
71         log.msg('Trying to find hash for %s' % path)
72         findDefer = self.mirrors.findHash(path)
73         
74         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
75                                callbackArgs=(req, path, d), errbackArgs=(req, path, d))
76         findDefer.addErrback(log.err)
77         return d
78     
79     def findHash_error(self, failure, req, path, d):
80         log.err(failure)
81         self.findHash_done(HashObject(), req, path, d)
82         
83     def findHash_done(self, hash, req, path, d):
84         if hash.expected() is None:
85             log.msg('Hash for %s was not found' % path)
86             self.lookupHash_done([], hash, path, d)
87         else:
88             log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
89             
90             # Lookup hash in cache
91             locations = self.db.lookupHash(hash.expected())
92             self.getCachedFile(hash, req, path, d, locations)
93
94     def getCachedFile(self, hash, req, path, d, locations):
95         if not locations:
96             log.msg('Failed to return file from cache: %s' % path)
97             self.lookupHash(hash, path, d)
98             return
99         
100         # Get the first possible location from the list
101         file = locations.pop(0)['path']
102         log.msg('Returning cached file: %s' % file.path)
103         
104         # Get it's response
105         resp = static.File(file.path).renderHTTP(req)
106         if isinstance(resp, defer.Deferred):
107             resp.addBoth(self._getCachedFile, hash, req, path, d, locations)
108         else:
109             self._getCachedFile(resp, hash, req, path, d, locations)
110         
111     def _getCachedFile(self, resp, hash, req, path, d, locations):
112         if isinstance(resp, failure.Failure):
113             log.msg('Got error trying to get cached file')
114             log.err()
115             # Try the next possible location
116             self.getCachedFile(hash, req, path, d, locations)
117             return
118             
119         log.msg('Cached response: %r' % resp)
120         
121         if resp.code >= 200 and resp.code < 400:
122             d.callback(resp)
123         else:
124             # Try the next possible location
125             self.getCachedFile(hash, req, path, d, locations)
126
127     def lookupHash(self, hash, path, d):
128         log.msg('Looking up hash in DHT for file: %s' % path)
129         key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
130         lookupDefer = self.dht.getValue(key)
131         lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
132
133     def lookupHash_done(self, locations, hash, path, d):
134         if not locations:
135             log.msg('Peers for %s were not found' % path)
136             getDefer = self.peers.get([path])
137             getDefer.addCallback(self.cache.save_file, hash, path)
138             getDefer.addErrback(self.cache.save_error, path)
139             getDefer.addCallbacks(d.callback, d.errback)
140         else:
141             log.msg('Found peers for %s: %r' % (path, locations))
142             # Download from the found peers
143             getDefer = self.peers.get(locations)
144             getDefer.addCallback(self.check_response, hash, path)
145             getDefer.addCallback(self.cache.save_file, hash, path)
146             getDefer.addErrback(self.cache.save_error, path)
147             getDefer.addCallbacks(d.callback, d.errback)
148             
149     def check_response(self, response, hash, path):
150         if response.code < 200 or response.code >= 300:
151             log.msg('Download from peers failed, going to direct download: %s' % path)
152             getDefer = self.peers.get([path])
153             return getDefer
154         return response
155         
156     def new_cached_file(self, file_path, hash, urlpath, url = None):
157         """Add a newly cached file to the DHT.
158         
159         If the file was downloaded, set url to the path it was downloaded for.
160         """
161         if url:
162             self.mirrors.updatedFile(url, file_path)
163         
164         if self.my_addr and hash:
165             site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))
166             full_path = urlunparse(('http', site, urlpath, None, None, None))
167             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
168             storeDefer = self.dht.storeValue(key, full_path)
169             storeDefer.addCallback(self.store_done, full_path)
170             return storeDefer
171         return None
172
173     def store_done(self, result, path):
174         log.msg('Added %s to the DHT: %r' % (path, result))
175