Fix some documentation errors.
[quix0rs-apt-p2p.git] / apt_p2p / DHTManager.py
1
2 """Manage all delaings with the DHT.
3
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
5     in the DHT
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
7     in the DHT
8 """
9
10 import sha
11
12 from twisted.internet import reactor
13 from twisted.python import log
14
15 from interfaces import IDHTStats
16 from apt_p2p_conf import config
17 from Hash import HashObject
18 from util import findMyIPAddr, compact
19
20 DHT_PIECES = 4
21 TORRENT_PIECES = 70
22
23 class DHT:
24     """Manages all the requests to a DHT.
25     
26     @type dhtClass: L{interfaces.IDHT}
27     @ivar dhtClass: the DHT class to use
28     @type db: L{db.DB}
29     @ivar db: the database to use for tracking files and hashes
30     @type dht: L{interfaces.IDHT}
31     @ivar dht: the DHT instance
32     @type my_contact: C{string}
33     @ivar my_contact: the 6-byte compact peer representation of this peer's
34         download information (IP address and port)
35     """
36     
37     def __init__(self, dhtClass, db):
38         """Initialize the instance.
39         
40         @type dhtClass: L{interfaces.IDHT}
41         @param dhtClass: the DHT class to use
42         """
43         self.dhtClass = dhtClass
44         self.db = db
45         self.my_contact = None
46         
47     def start(self):
48         self.dht = self.dhtClass()
49         self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
50         df = self.dht.join()
51         df.addCallbacks(self.joinComplete, self.joinError)
52         return df
53         
54     def joinComplete(self, result):
55         """Complete the DHT join process and determine our download information.
56         
57         Called by the DHT when the join has been completed with information
58         on the external IP address and port of this peer.
59         """
60         my_addr = findMyIPAddr(result,
61                                config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
62                                config.getboolean('DEFAULT', 'LOCAL_OK'))
63         if not my_addr:
64             raise RuntimeError, "IP address for this machine could not be found"
65         self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
66         self.nextRefresh = reactor.callLater(60, self.refreshFiles)
67         return (my_addr, config.getint('DEFAULT', 'PORT'))
68
69     def joinError(self, failure):
70         """Joining the DHT has failed."""
71         log.msg("joining DHT failed miserably")
72         log.err(failure)
73         return failure
74     
75     def refreshFiles(self, result = None, hashes = {}):
76         """Refresh any files in the DHT that are about to expire."""
77         if result is not None:
78             log.msg('Storage resulted in: %r' % result)
79
80         if not hashes:
81             expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
82             hashes = self.db.expiredHashes(expireAfter)
83             if len(hashes.keys()) > 0:
84                 log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
85
86         delay = 60
87         if hashes:
88             delay = 3
89             raw_hash = hashes.keys()[0]
90             self.db.refreshHash(raw_hash)
91             hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
92             del hashes[raw_hash]
93             storeDefer = self.store(hash)
94             storeDefer.addBoth(self.refreshFiles, hashes)
95
96         if self.nextRefresh.active():
97             self.nextRefresh.reset(delay)
98         else:
99             self.nextRefresh = reactor.callLater(delay, self.refreshFiles, None, hashes)
100     
101     def getStats(self):
102         """Retrieve the formatted statistics for the DHT.
103         
104         @rtype: C{string}
105         @return: the formatted HTML page containing the statistics
106         """
107         if IDHTStats.implementedBy(self.dhtClass):
108             return self.dht.getStats()
109         return "<p>DHT doesn't support statistics\n"
110
111     def get(self, key):
112         """Retrieve a hash's value from the DHT."""
113         return self.dht.getValue(key)
114     
115     def store(self, hash):
116         """Add a hash for a file to the DHT.
117         
118         Sets the key and value from the hash information, and tries to add
119         it to the DHT.
120         """
121         key = hash.digest()
122         value = {'c': self.my_contact}
123         pieces = hash.pieceDigests()
124         
125         # Determine how to store any piece data
126         if len(pieces) <= 1:
127             pass
128         elif len(pieces) <= DHT_PIECES:
129             # Short enough to be stored with our peer contact info
130             value['t'] = {'t': ''.join(pieces)}
131         elif len(pieces) <= TORRENT_PIECES:
132             # Short enough to be stored in a separate key in the DHT
133             value['h'] = sha.new(''.join(pieces)).digest()
134         else:
135             # Too long, must be served up by our peer HTTP server
136             value['l'] = sha.new(''.join(pieces)).digest()
137
138         storeDefer = self.dht.storeValue(key, value)
139         storeDefer.addCallbacks(self._store_done, self._store_error,
140                                 callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
141         return storeDefer
142
143     def _store_done(self, result, hash):
144         """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
145         log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
146         pieces = hash.pieceDigests()
147         if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
148             # Add the piece data key and value to the DHT
149             key = sha.new(''.join(pieces)).digest()
150             value = {'t': ''.join(pieces)}
151
152             storeDefer = self.dht.storeValue(key, value)
153             storeDefer.addCallbacks(self._store_torrent_done, self._store_error,
154                                     callbackArgs = (key, ), errbackArgs = (key, ))
155             return storeDefer
156         return result
157
158     def _store_torrent_done(self, result, key):
159         """Adding the pieces to the DHT is complete."""
160         log.msg('Added torrent string %r to the DHT: %r' % (key, result))
161         return result
162
163     def _store_error(self, err, key):
164         """Adding to the DHT failed."""
165         log.msg('An error occurred adding %r to the DHT: %r' % (key, err))
166         return err
167