2 """Manage all delaings with the DHT.
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
12 from twisted.internet import reactor
13 from twisted.python import log
15 from interfaces import IDHTStats
16 from apt_p2p_conf import config
17 from Hash import HashObject
18 from util import findMyIPAddr, compact
24 """Manages all the requests to a DHT.
26 @type dhtClass: L{interfaces.IDHT}
27 @ivar dhtClass: the DHT class to use
29 @ivar db: the database to use for tracking files and hashes
30 @type dht: L{interfaces.IDHT}
31 @ivar dht: the DHT instance
32 @type my_contact: C{string}
33 @ivar my_contact: the 6-byte compact peer representation of this peer's
34 download information (IP address and port)
35 @type nextRefresh: L{twisted.internet.interfaces.IDelayedCall}
36 @ivar nextRefresh: the next delayed call to refreshFiles
37 @type refreshingHashes: C{list} of C{dictionary}
38 @ivar refreshingHashes: the list of hashes that still need to be refreshed
41 def __init__(self, dhtClass, db):
42 """Initialize the instance.
44 @type dhtClass: L{interfaces.IDHT}
45 @param dhtClass: the DHT class to use
47 self.dhtClass = dhtClass
49 self.my_contact = None
50 self.nextRefresh = None
51 self.refreshingHashes = []
54 self.dht = self.dhtClass()
55 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
57 df.addCallbacks(self.joinComplete, self.joinError)
60 def joinComplete(self, result):
61 """Complete the DHT join process and determine our download information.
63 Called by the DHT when the join has been completed with information
64 on the external IP address and port of this peer.
66 my_addr = findMyIPAddr(result,
67 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
68 config.getboolean('DEFAULT', 'LOCAL_OK'))
70 raise RuntimeError, "IP address for this machine could not be found"
71 self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
72 if not self.nextRefresh or not self.nextRefresh.active():
73 self.nextRefresh = reactor.callLater(60, self.refreshFiles)
74 return (my_addr, config.getint('DEFAULT', 'PORT'))
76 def joinError(self, failure):
77 """Joining the DHT has failed."""
78 log.msg("joining DHT failed miserably")
82 def refreshFiles(self, result = None):
83 """Refresh any files in the DHT that are about to expire."""
84 if result is not None:
85 log.msg('Storage resulted in: %r' % result)
87 if not self.refreshingHashes:
88 expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
89 self.refreshingHashes = self.db.expiredHashes(expireAfter)
90 if len(self.refreshingHashes) > 0:
91 log.msg('Refreshing the keys of %d DHT values' % len(self.refreshingHashes))
94 if self.refreshingHashes:
96 refresh = self.refreshingHashes.pop(0)
97 self.db.refreshHash(refresh['hash'])
98 hash = HashObject(refresh['hash'], pieces = refresh['pieces'])
99 storeDefer = self.store(hash)
100 storeDefer.addBoth(self.refreshFiles)
102 if self.nextRefresh.active():
103 self.nextRefresh.reset(delay)
105 self.nextRefresh = reactor.callLater(delay, self.refreshFiles)
108 """Retrieve the formatted statistics for the DHT.
111 @return: the formatted HTML page containing the statistics
113 if IDHTStats.implementedBy(self.dhtClass):
114 return self.dht.getStats()
115 return "<p>DHT doesn't support statistics\n"
118 """Retrieve a hash's value from the DHT."""
119 return self.dht.getValue(key)
121 def store(self, hash):
122 """Add a hash for a file to the DHT.
124 Sets the key and value from the hash information, and tries to add
128 value = {'c': self.my_contact}
129 pieces = hash.pieceDigests()
131 # Determine how to store any piece data
134 elif len(pieces) <= DHT_PIECES:
135 # Short enough to be stored with our peer contact info
136 value['t'] = {'t': ''.join(pieces)}
137 elif len(pieces) <= TORRENT_PIECES:
138 # Short enough to be stored in a separate key in the DHT
139 value['h'] = sha.new(''.join(pieces)).digest()
141 # Too long, must be served up by our peer HTTP server
142 value['l'] = sha.new(''.join(pieces)).digest()
144 storeDefer = self.dht.storeValue(key, value)
145 storeDefer.addCallbacks(self._store_done, self._store_error,
146 callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
149 def _store_done(self, result, hash):
150 """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
151 log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
152 pieces = hash.pieceDigests()
153 if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
154 # Add the piece data key and value to the DHT
155 key = sha.new(''.join(pieces)).digest()
156 value = {'t': ''.join(pieces)}
158 storeDefer = self.dht.storeValue(key, value)
159 storeDefer.addCallbacks(self._store_torrent_done, self._store_error,
160 callbackArgs = (key, ), errbackArgs = (key, ))
164 def _store_torrent_done(self, result, key):
165 """Adding the pieces to the DHT is complete."""
166 log.msg('Added torrent string %r to the DHT: %r' % (key, result))
169 def _store_error(self, err, key):
170 """Adding to the DHT failed."""
171 log.msg('An error occurred adding %r to the DHT: %r' % (key, err))