]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht/apt_dht.py
Moved the finding IP address function to the util module.
[quix0rs-apt-p2p.git] / apt_dht / apt_dht.py
1
2 from binascii import b2a_hex
3 from urlparse import urlunparse
4 import os, re
5
6 from twisted.internet import defer
7 from twisted.web2 import server, http, http_headers
8 from twisted.python import log
9
10 from apt_dht_conf import config
11 from PeerManager import PeerManager
12 from HTTPServer import TopLevel
13 from MirrorManager import MirrorManager
14 from Hash import HashObject
15 from db import DB
16 from util import findMyIPAddr
17
18 class AptDHT:
19     def __init__(self, dht):
20         log.msg('Initializing the main apt_dht application')
21         self.db = DB(config.get('DEFAULT', 'cache_dir') + '/.apt-dht.db')
22         self.dht = dht
23         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
24         self.dht.join().addCallbacks(self.joinComplete, self.joinError)
25         self.http_server = TopLevel(config.get('DEFAULT', 'cache_dir'), self)
26         self.http_site = server.Site(self.http_server)
27         self.peers = PeerManager()
28         self.mirrors = MirrorManager(config.get('DEFAULT', 'cache_dir'), self)
29         self.my_addr = None
30     
31     def getSite(self):
32         return self.http_site
33     
34     def joinComplete(self, result):
35         self.my_addr = findMyIPAddr(result, config.getint(config.get('DEFAULT', 'DHT'), 'PORT'))
36
37     def joinError(self, failure):
38         log.msg("joining DHT failed miserably")
39         log.err(failure)
40     
41     def check_freshness(self, path, modtime, resp):
42         log.msg('Checking if %s is still fresh' % path)
43         d = self.peers.get([path], "HEAD", modtime)
44         d.addCallback(self.check_freshness_done, path, resp)
45         return d
46     
47     def check_freshness_done(self, resp, path, orig_resp):
48         if resp.code == 304:
49             log.msg('Still fresh, returning: %s' % path)
50             return orig_resp
51         else:
52             log.msg('Stale, need to redownload: %s' % path)
53             return self.get_resp(path)
54     
55     def get_resp(self, path):
56         d = defer.Deferred()
57         
58         log.msg('Trying to find hash for %s' % path)
59         findDefer = self.mirrors.findHash(path)
60         
61         findDefer.addCallbacks(self.findHash_done, self.findHash_error, 
62                                callbackArgs=(path, d), errbackArgs=(path, d))
63         findDefer.addErrback(log.err)
64         return d
65     
66     def findHash_error(self, failure, path, d):
67         log.err(failure)
68         self.findHash_done(HashObject(), path, d)
69         
70     def findHash_done(self, hash, path, d):
71         if hash.expected() is None:
72             log.msg('Hash for %s was not found' % path)
73             self.lookupHash_done([], hash, path, d)
74         else:
75             log.msg('Found hash %s for %s' % (hash.hexexpected(), path))
76             # Lookup hash from DHT
77             key = hash.normexpected(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
78             lookupDefer = self.dht.getValue(key)
79             lookupDefer.addCallback(self.lookupHash_done, hash, path, d)
80             
81     def lookupHash_done(self, locations, hash, path, d):
82         if not locations:
83             log.msg('Peers for %s were not found' % path)
84             getDefer = self.peers.get([path])
85             getDefer.addCallback(self.mirrors.save_file, hash, path)
86             getDefer.addErrback(self.mirrors.save_error, path)
87             getDefer.addCallbacks(d.callback, d.errback)
88         else:
89             log.msg('Found peers for %s: %r' % (path, locations))
90             # Download from the found peers
91             getDefer = self.peers.get(locations)
92             getDefer.addCallback(self.check_response, hash, path)
93             getDefer.addCallback(self.mirrors.save_file, hash, path)
94             getDefer.addErrback(self.mirrors.save_error, path)
95             getDefer.addCallbacks(d.callback, d.errback)
96             
97     def check_response(self, response, hash, path):
98         if response.code < 200 or response.code >= 300:
99             log.msg('Download from peers failed, going to direct download: %s' % path)
100             getDefer = self.peers.get([path])
101             return getDefer
102         return response
103         
104     def cached_file(self, hash, url, file_path):
105         assert file_path.startswith(config.get('DEFAULT', 'cache_dir'))
106         urlpath, newdir = self.db.storeFile(file_path, hash.digest(), config.get('DEFAULT', 'cache_dir'))
107         log.msg('now avaliable at %s: %s' % (urlpath, url))
108
109         if self.my_addr:
110             site = self.my_addr + ':' + str(config.getint('DEFAULT', 'PORT'))
111             full_path = urlunparse(('http', site, urlpath, None, None, None))
112             key = hash.norm(bits = config.getint(config.get('DEFAULT', 'DHT'), 'HASH_LENGTH'))
113             storeDefer = self.dht.storeValue(key, full_path)
114             storeDefer.addCallback(self.store_done, full_path)
115             storeDefer.addErrback(log.err)
116
117     def store_done(self, result, path):
118         log.msg('Added %s to the DHT: %r' % (path, result))
119