2 """Manage all delaings with the DHT.
4 @var DHT_PIECES: the maximum number of pieces to store with our contact info
6 @var TORRENT_PIECES: the maximum number of pieces to store as a separate entry
12 from twisted.internet import reactor
13 from twisted.python import log
15 from interfaces import IDHTStats
16 from apt_p2p_conf import config
17 from Hash import HashObject
18 from util import findMyIPAddr, compact
24 """Manages all the requests to a DHT.
26 @type dhtClass: L{interfaces.IDHT}
27 @ivar dhtClass: the DHT class to use
29 @ivar db: the database to use for tracking files and hashes
30 @type dht: L{interfaces.IDHT}
31 @ivar dht: the DHT instance
32 @type my_contact: C{string}
33 @ivar my_contact: the 6-byte compact peer representation of this peer's
34 download information (IP address and port)
37 def __init__(self, dhtClass, db):
38 """Initialize the instance.
40 @type dhtClass: L{interfaces.IDHT}
41 @param dhtClass: the DHT class to use
43 self.dhtClass = dhtClass
45 self.my_contact = None
48 self.dht = self.dhtClass()
49 self.dht.loadConfig(config, config.get('DEFAULT', 'DHT'))
51 df.addCallbacks(self.joinComplete, self.joinError)
54 def joinComplete(self, result):
55 """Complete the DHT join process and determine our download information.
57 Called by the DHT when the join has been completed with information
58 on the external IP address and port of this peer.
60 my_addr = findMyIPAddr(result,
61 config.getint(config.get('DEFAULT', 'DHT'), 'PORT'),
62 config.getboolean('DEFAULT', 'LOCAL_OK'))
64 raise RuntimeError, "IP address for this machine could not be found"
65 self.my_contact = compact(my_addr, config.getint('DEFAULT', 'PORT'))
66 self.nextRefresh = reactor.callLater(60, self.refreshFiles)
67 return (my_addr, config.getint('DEFAULT', 'PORT'))
69 def joinError(self, failure):
70 """Joining the DHT has failed."""
71 log.msg("joining DHT failed miserably")
75 def refreshFiles(self, result = None, hashes = {}):
76 """Refresh any files in the DHT that are about to expire."""
77 if result is not None:
78 log.msg('Storage resulted in: %r' % result)
81 expireAfter = config.gettime('DEFAULT', 'KEY_REFRESH')
82 hashes = self.db.expiredHashes(expireAfter)
83 if len(hashes.keys()) > 0:
84 log.msg('Refreshing the keys of %d DHT values' % len(hashes.keys()))
89 raw_hash = hashes.keys()[0]
90 self.db.refreshHash(raw_hash)
91 hash = HashObject(raw_hash, pieces = hashes[raw_hash]['pieces'])
93 storeDefer = self.store(hash)
94 storeDefer.addBoth(self.refreshFiles, hashes)
96 if self.nextRefresh.active():
97 self.nextRefresh.reset(delay)
99 self.nextRefresh = reactor.callLater(delay, self.refreshFiles, None, hashes)
102 """Retrieve the formatted statistics for the DHT.
105 @return: the formatted HTML page containing the statistics
107 if IDHTStats.implementedBy(self.dhtClass):
108 return self.dht.getStats()
109 return "<p>DHT doesn't support statistics\n"
112 """Retrieve a hash's value from the DHT."""
113 return self.dht.getValue(key)
115 def store(self, hash):
116 """Add a hash for a file to the DHT.
118 Sets the key and value from the hash information, and tries to add
122 value = {'c': self.my_contact}
123 pieces = hash.pieceDigests()
125 # Determine how to store any piece data
128 elif len(pieces) <= DHT_PIECES:
129 # Short enough to be stored with our peer contact info
130 value['t'] = {'t': ''.join(pieces)}
131 elif len(pieces) <= TORRENT_PIECES:
132 # Short enough to be stored in a separate key in the DHT
133 value['h'] = sha.new(''.join(pieces)).digest()
135 # Too long, must be served up by our peer HTTP server
136 value['l'] = sha.new(''.join(pieces)).digest()
138 storeDefer = self.dht.storeValue(key, value)
139 storeDefer.addCallbacks(self._store_done, self._store_error,
140 callbackArgs = (hash, ), errbackArgs = (hash.digest(), ))
143 def _store_done(self, result, hash):
144 """Add a key/value pair for the pieces of the file to the DHT (if necessary)."""
145 log.msg('Added %s to the DHT: %r' % (hash.hexdigest(), result))
146 pieces = hash.pieceDigests()
147 if len(pieces) > DHT_PIECES and len(pieces) <= TORRENT_PIECES:
148 # Add the piece data key and value to the DHT
149 key = sha.new(''.join(pieces)).digest()
150 value = {'t': ''.join(pieces)}
152 storeDefer = self.dht.storeValue(key, value)
153 storeDefer.addCallbacks(self._store_torrent_done, self._store_error,
154 callbackArgs = (key, ), errbackArgs = (key, ))
158 def _store_torrent_done(self, result, key):
159 """Adding the pieces to the DHT is complete."""
160 log.msg('Added torrent string %r to the DHT: %r' % (key, result))
163 def _store_error(self, err, key):
164 """Adding to the DHT failed."""
165 log.msg('An error occurred adding %r to the DHT: %r' % (key, err))