]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_p2p/DHTManager.py
Only touch a bucket if a find request targets it.
[quix0rs-apt-p2p.git] / apt_p2p / DHTManager.py
1
2 """Manage all delaings with the DHT.
3
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
5     in the DHT
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
7     in the DHT
8 """
9
10 import sha
11
12 from twisted.internet import reactor
13 from twisted.python import log
14
15 from interfaces import IDHTStats
16 from apt_p2p_conf import config
17 from Hash import HashObject
18 from util import findMyIPAddr, compact
19
20 DHT_PIECES = 4
21 TORRENT_PIECES = 70
22
23 class DHT:
24     """Manages all the requests to a DHT.
25     
26     @type dhtClass: L{interfaces.IDHT}
27     @ivar dhtClass: the DHT class to use
28     @type db: L{db.DB}
29     @ivar db: the database to use for tracking files and hashes
30     @type dht: L{interfaces.IDHT}
31     @ivar dht: the DHT instance
32     @type my_contact: C{string}
33     @ivar my_contact: the 6-byte compact peer representation of this peer's
34         download information (IP address and port)
35     @type nextRefresh: L{twisted.internet.interfaces.IDelayedCall}
36     @ivar nextRefresh: the next delayed call to refreshFiles
37     @type refreshingHashes: C{list} of C{dictionary}
38     @ivar refreshingHashes: the list of hashes that still need to be refreshed
39     """
40     
41     def __init__(self, dhtClass, db):
42         """Initialize the instance.
43         
44         @type dhtClass: L{interfaces.IDHT}
45         @param dhtClass: the DHT class to use
46         """
47         self.dhtClass = dhtClass
48         self.db = db
49         self.my_contact = None
50         self.nextRefresh = None
51         self.refreshingHashes = []
52         
53     def start(self):
54         self.dht = self.dhtClass()
55         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
56         df = self.dht.join()
57         df.addCallbacks(self.joinComplete, self.joinError)
58         return df
59         
60     def joinComplete(self, result):
61         """Complete the DHT join process and determine our download information.
62         
63         Called by the DHT when the join has been completed with information
64         on the external IP address and port of this peer.
65         """
66         my_addr = findMyIPAddr(result,
67                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
68                                config.getboolean('DEFAULT', 'LOCAL_OK'))
69         if not my_addr:
70             raise RuntimeError, "IP address for this machine could not be found"
71         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
72         if not self.nextRefresh or not self.nextRefresh.active():
73             self.nextRefresh = reactor.callLater(60, self.refreshFiles)
74         return (my_addr, config.getint('DEFAULT', 'PORT'))
75
76     def joinError(self, failure):
77         """Joining the DHT has failed."""
78         log.msg("joining DHT failed miserably")
79         log.err(failure)
80         return failure
81     
82     def refreshFiles(self, result = None):
83         """Refresh any files in the DHT that are about to expire."""
84         if result is not None:
85             log.msg('Storage resulted in: %r' % result)
86
87         if not self.refreshingHashes:
88             expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
89             self.refreshingHashes = self.db.expiredHashes(expireAfter)
90             if len(self.refreshingHashes) > 0:
91                 log.msg('Refreshing the keys of %d DHT values' % len(self.refreshingHashes))
92
93         delay = 60
94         if self.refreshingHashes:
95             delay = 3
96             refresh = self.refreshingHashes.pop(0)
97             self.db.refreshHash(refresh['hash'])
98             hash = HashObject(refresh['hash'], pieces = refresh['pieces'])
99             storeDefer = self.store(hash)
100             storeDefer.addBoth(self.refreshFiles)
101
102         if self.nextRefresh.active():
103             self.nextRefresh.reset(delay)
104         else:
105             self.nextRefresh = reactor.callLater(delay, self.refreshFiles)
106     
107     def getStats(self):
108         """Retrieve the formatted statistics for the DHT.
109         
110         @rtype: C{string}
111         @return: the formatted HTML page containing the statistics
112         """
113         if IDHTStats.implementedBy(self.dhtClass):
114             return self.dht.getStats()
115         return "<p>DHT doesn't support statistics\n"
116
117     def get(self, key):
118         """Retrieve a hash's value from the DHT."""
119         return self.dht.getValue(key)
120     
121     def store(self, hash):
122         """Add a hash for a file to the DHT.
123         
124         Sets the key and value from the hash information, and tries to add
125         it to the DHT.
126         """
127         key = hash.digest()
128         value = {'c': self.my_contact}
129         pieces = hash.pieceDigests()
130         
131         # Determine how to store any piece data
132         if len(pieces) <= 1:
133             pass
134         elif len(pieces) <= DHT_PIECES:
135             # Short enough to be stored with our peer contact info
136             value['t'] = {'t': ''.join(pieces)}
137         elif len(pieces) <= TORRENT_PIECES:
138             # Short enough to be stored in a separate key in the DHT
139             value['h'] = sha.new(''.join(pieces)).digest()
140         else:
141             # Too long, must be served up by our peer HTTP server
142             value['l'] = sha.new(''.join(pieces)).digest()
143
144         storeDefer = self.dht.storeValue(key, value)
145         storeDefer.addCallbacks(self._store_done, self._store_error,
146                                 callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
147         return storeDefer
148
149     def _store_done(self, result, hash):
150         """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
151         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
152         pieces = hash.pieceDigests()
153         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
154             # Add the piece data key and value to the DHT
155             key = sha.new(''.join(pieces)).digest()
156             value = {'t': ''.join(pieces)}
157
158             storeDefer = self.dht.storeValue(key, value)
159             storeDefer.addCallbacks(self._store_torrent_done, self._store_error,
160                                     callbackArgs = (key, ), errbackArgs = (key, ))
161             return storeDefer
162         return result
163
164     def _store_torrent_done(self, result, key):
165         """Adding the pieces to the DHT is complete."""
166         log.msg('Added torrent string %r to the DHT: %r' % (key, result))
167         return result
168
169     def _store_error(self, err, key):
170         """Adding to the DHT failed."""
171         log.msg('An error occurred adding %r to the DHT: %r' % (key, err))
172         return err
173