Added scanning of other directories for cached packages.
[quix0rs-apt-p2p.git] / apt_dht / apt_dht.py
1
2 from binascii import b2a_hex
3 from urlparse import urlunparse
4 import os, re
5
6 from twisted.internet import defer
7 from twisted.web2 import server, http, http_headers
8 from twisted.python import log
9 from twisted.python.filepath import FilePath
10
11 from apt_dht_conf import config
12 from PeerManager import PeerManager
13 from HTTPServer import TopLevel
14 from MirrorManager import MirrorManager
15 from CacheManager import CacheManager
16 from Hash import HashObject
17 from db import DB
18 from util import findMyIPAddr
19
20 download_dir = 'cache'
21
22 class AptDHT:
23     def __init__(self, dht):
24         log.msg('Initializing the main apt_dht application')
25         self.cache_dir = FilePath(config.get('DEFAULT', 'cache_dir'))
26         if not self.cache_dir.child(download_dir).exists():
27             self.cache_dir.child(download_dir).makedirs()
28         self.db = DB(self.cache_dir.child('apt-dht.db'))
29         self.dht = dht
30         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
31         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
32         self.http_server = TopLevel(self.cache_dir.child(download_dir), self)
33         self.setDirectories = self.http_server.setDirectories
34         self.http_site = server.Site(self.http_server)
35         self.peers = PeerManager()
36         self.mirrors = MirrorManager(self.cache_dir)
37         other_dirs = [FilePath(f) for f in config.getstringlist('DEFAULT', 'OTHER_DIRS')]
38         self.cache = CacheManager(self.cache_dir.child(download_dir), self.db, other_dirs, self)
39         self.my_addr = None
40     
41     def getSite(self):
42         return self.http_site
43     
44     def joinComplete(self, result):
45         self.my_addr = findMyIPAddr(result, config.getint(config.get('DEFAULT', 'DHT'), 'PORT'))
46         if not self.my_addr:
47             raise RuntimeError, "IP address for this machine could not be found"
48         self.cache.scanDirectories()
49
50     def joinError(self, failure):
51         log.msg("joining DHT failed miserably")
52         log.err(failure)
53         raise RuntimeError, "IP address for this machine could not be found"
54     
55     def check_freshness(self, path, modtime, resp):
56         log.msg('Checking if %s is still fresh' % path)
57         d = self.peers.get([path], "HEAD", modtime)
58         d.addCallback(self.check_freshness_done, path, resp)
59         return d
60     
61     def check_freshness_done(self, resp, path, orig_resp):
62         if resp.code == 304:
63             log.msg('Still fresh, returning: %s' % path)
64             return orig_resp
65         else:
66             log.msg('Stale, need to redownload: %s' % path)
67             return self.get_resp(path)
68     
69     def get_resp(self, path):
70         d = defer.Deferred()
71         
72         log.msg('Trying to find hash for %s' % path)
73         findDefer = self.mirrors.findHash(path)
74         
75         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
76                                callbackArgs=(path, d), errbackArgs=(path, d))
77         findDefer.addErrback(log.err)
78         return d
79     
80     def findHash_error(self, failure, path, d):
81         log.err(failure)
82         self.findHash_done(HashObject(), path, d)
83         
84     def findHash_done(self, hash, path, d):
85         if hash.expected() is None:
86             log.msg('Hash for %s was not found' % path)
87             self.lookupHash_done([], hash, path, d)
88         else:
89             log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
90             # Lookup hash from DHT
91             key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
92             lookupDefer = self.dht.getValue(key)
93             lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
94             
95     def lookupHash_done(self, locations, hash, path, d):
96         if not locations:
97             log.msg('Peers for %s were not found' % path)
98             getDefer = self.peers.get([path])
99             getDefer.addCallback(self.cache.save_file, hash, path)
100             getDefer.addErrback(self.cache.save_error, path)
101             getDefer.addCallbacks(d.callback, d.errback)
102         else:
103             log.msg('Found peers for %s: %r' % (path, locations))
104             # Download from the found peers
105             getDefer = self.peers.get(locations)
106             getDefer.addCallback(self.check_response, hash, path)
107             getDefer.addCallback(self.cache.save_file, hash, path)
108             getDefer.addErrback(self.cache.save_error, path)
109             getDefer.addCallbacks(d.callback, d.errback)
110             
111     def check_response(self, response, hash, path):
112         if response.code < 200 or response.code >= 300:
113             log.msg('Download from peers failed, going to direct download: %s' % path)
114             getDefer = self.peers.get([path])
115             return getDefer
116         return response
117         
118     def new_cached_file(self, file_path, hash, urlpath, url = None):
119         """Add a newly cached file to the DHT.
120         
121         If the file was downloaded, set url to the path it was downloaded for.
122         """
123         if url:
124             self.mirrors.updatedFile(url, file_path)
125         
126         if self.my_addr and hash:
127             site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))
128             full_path = urlunparse(('http', site, urlpath, None, None, None))
129             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
130             storeDefer = self.dht.storeValue(key, full_path)
131             storeDefer.addCallback(self.store_done, full_path)
132             storeDefer.addErrback(log.err)
133
134     def store_done(self, result, path):
135         log.msg('Added %s to the DHT: %r' % (path, result))
136