2 """The main interface to the Khashmir DHT.
4 @var khashmir_dir: the name of the directory to use for DHT files
7 from datetime import datetime
10 from twisted.internet import defer, reactor
11 from twisted.internet.abstract import isIPAddress
12 from twisted.python import log
13 from twisted.trial import unittest
14 from zope.interface import implements
16 from apt_p2p.interfaces import IDHT, IDHTStats, IDHTStatsFactory
17 from khashmir import Khashmir
18 from bencode import bencode, bdecode
19 from khash import HASH_LENGTH
22 from twisted.web2 import channel, server, resource, http, http_headers
27 khashmir_dir = 'apt-p2p-Khashmir'
29 class DHTError(Exception):
30 """Represents errors that occur in the DHT."""
33 """The main interface instance to the Khashmir DHT.
35 @type config: C{dictionary}
36 @ivar config: the DHT configuration values
37 @type cache_dir: C{string}
38 @ivar cache_dir: the directory to use for storing files
39 @type bootstrap: C{list} of C{string}
40 @ivar bootstrap: the nodes to contact to bootstrap into the system
41 @type bootstrap_node: C{boolean}
42 @ivar bootstrap_node: whether this node is a bootstrap node
43 @type joining: L{twisted.internet.defer.Deferred}
44 @ivar joining: if a join is underway, the deferred that will signal it's end
45 @type joined: C{boolean}
46 @ivar joined: whether the DHT network has been successfully joined
47 @type outstandingJoins: C{int}
48 @ivar outstandingJoins: the number of bootstrap nodes that have yet to respond
49 @type next_rejoin: C{int}
50 @ivar next_rejoin: the number of seconds before retrying the next join
51 @type foundAddrs: C{list} of (C{string}, C{int})
52 @ivar foundAddrs: the IP address an port that were returned by bootstrap nodes
53 @type storing: C{dictionary}
54 @ivar storing: keys are keys for which store requests are active, values
55 are dictionaries with keys the values being stored and values the
56 deferred to call when complete
57 @type retrieving: C{dictionary}
58 @ivar retrieving: keys are the keys for which getValue requests are active,
59 values are lists of the deferreds waiting for the requests
60 @type retrieved: C{dictionary}
61 @ivar retrieved: keys are the keys for which getValue requests are active,
62 values are list of the values returned so far
63 @type factory: L{twisted.web2.channel.HTTPFactory}
64 @ivar factory: the factory to use to serve HTTP requests for statistics
65 @type config_parser: L{apt_p2p.apt_p2p_conf.AptP2PConfigParser}
66 @ivar config_parser: the configuration info for the main program
67 @type section: C{string}
68 @ivar section: the section of the configuration info that applies to the DHT
69 @type khashmir: L{khashmir.Khashmir}
70 @ivar khashmir: the khashmir DHT instance to use
74 implements(IDHT, IDHTStats, IDHTStatsFactory)
76 implements(IDHT, IDHTStats)
79 """Initialize the DHT."""
83 self.bootstrap_node = False
87 self.outstandingJoins = 0
95 def loadConfig(self, config, section):
96 """See L{apt_p2p.interfaces.IDHT}."""
97 self.config_parser = config
98 self.section = section
101 # Get some initial values
102 self.cache_dir = os.path.join(self.config_parser.get(section, 'cache_dir'), khashmir_dir)
103 if not os.path.exists(self.cache_dir):
104 os.makedirs(self.cache_dir)
105 self.bootstrap = self.config_parser.getstringlist(section, 'BOOTSTRAP')
106 self.bootstrap_node = self.config_parser.getboolean(section, 'BOOTSTRAP_NODE')
107 for k in self.config_parser.options(section):
108 # The numbers in the config file
109 if k in ['CONCURRENT_REQS', 'STORE_REDUNDANCY',
110 'RETRIEVE_VALUES', 'MAX_FAILURES', 'PORT']:
111 self.config[k] = self.config_parser.getint(section, k)
112 # The times in the config file
113 elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL',
114 'BUCKET_STALENESS', 'KEY_EXPIRE']:
115 self.config[k] = self.config_parser.gettime(section, k)
116 # The booleans in the config file
118 self.config[k] = self.config_parser.getboolean(section, k)
119 # Everything else is a string
121 self.config[k] = self.config_parser.get(section, k)
123 def join(self, deferred = None):
124 """See L{apt_p2p.interfaces.IDHT}.
126 @param deferred: the deferred to callback when the join is complete
127 (optional, defaults to creating a new deferred and returning it)
129 # Check for multiple simultaneous joins
132 deferred.errback(DHTError("a join is already in progress"))
135 raise DHTError, "a join is already in progress"
138 self.joining = deferred
140 self.joining = defer.Deferred()
142 if self.config is None:
143 self.joining.errback(DHTError("configuration not loaded"))
146 # Create the new khashmir instance
147 if not self.khashmir:
148 self.khashmir = Khashmir(self.config, self.cache_dir)
150 self.outstandingJoins = 0
151 for node in self.bootstrap:
152 host, port = node.rsplit(':', 1)
154 self.outstandingJoins += 1
156 # Translate host names into IP addresses
157 if isIPAddress(host):
158 self._join_gotIP(host, port)
160 reactor.resolve(host).addCallbacks(self._join_gotIP,
161 self._join_resolveFailed,
162 callbackArgs = (port, ),
163 errbackArgs = (host, port))
167 def _join_gotIP(self, ip, port):
168 """Join the DHT using a single bootstrap nodes IP address."""
169 self.khashmir.addContact(ip, port, self._join_single, self._join_error)
171 def _join_resolveFailed(self, err, host, port):
172 """Failed to lookup the IP address of the bootstrap node."""
173 log.msg('Failed to find an IP address for host: (%r, %r)' % (host, port))
175 self.outstandingJoins -= 1
176 if self.outstandingJoins <= 0:
177 self.khashmir.findCloseNodes(self._join_complete)
179 def _join_single(self, addr):
180 """Process the response from the bootstrap node.
182 Finish the join by contacting close nodes.
184 self.outstandingJoins -= 1
186 self.foundAddrs.append(addr)
187 if addr or self.outstandingJoins <= 0:
188 self.khashmir.findCloseNodes(self._join_complete)
189 log.msg('Got back from bootstrap node: %r' % (addr,))
191 def _join_error(self, failure = None):
192 """Process an error in contacting the bootstrap node.
194 If no bootstrap nodes remain, finish the process by contacting
197 self.outstandingJoins -= 1
198 log.msg("bootstrap node could not be reached")
199 if self.outstandingJoins <= 0:
200 self.khashmir.findCloseNodes(self._join_complete)
202 def _join_complete(self, result):
203 """End the joining process and return the addresses found for this node."""
204 if not self.joined and isinstance(result, list) and len(result) > 1:
206 if self.joining and self.outstandingJoins <= 0:
209 if self.joined or self.bootstrap_node:
211 df.callback(self.foundAddrs)
213 # Try to join later using exponential backoff delays
214 log.msg('Join failed, retrying in %d seconds' % self.next_rejoin)
215 reactor.callLater(self.next_rejoin, self.join, df)
216 self.next_rejoin *= 2
219 """Get the list of addresses returned by bootstrap nodes for this node."""
220 return self.foundAddrs
223 """See L{apt_p2p.interfaces.IDHT}."""
224 if self.config is None:
225 raise DHTError, "configuration not loaded"
227 if self.joined or self.joining:
229 self.joining.errback(DHTError('still joining when leave was called'))
232 self.khashmir.shutdown()
234 def _normKey(self, key):
235 """Normalize the length of keys used in the DHT."""
236 # Extend short keys with null bytes
237 if len(key) < HASH_LENGTH:
238 key = key + '\000'*(HASH_LENGTH - len(key))
240 elif len(key) > HASH_LENGTH:
241 key = key[:HASH_LENGTH]
244 def getValue(self, key):
245 """See L{apt_p2p.interfaces.IDHT}."""
246 if self.config is None:
247 return defer.fail(DHTError("configuration not loaded"))
249 return defer.fail(DHTError("have not joined a network yet"))
252 key = self._normKey(key)
254 if key not in self.retrieving:
255 self.khashmir.valueForKey(key, self._getValue)
256 self.retrieving.setdefault(key, []).append(d)
259 def _getValue(self, key, result):
260 """Process a returned list of values from the DHT."""
261 # Save the list of values to return when it is complete
263 self.retrieved.setdefault(key, []).extend([bdecode(r) for r in result])
265 # Empty list, the get is complete, return the result
267 if key in self.retrieved:
268 final_result = self.retrieved[key]
269 del self.retrieved[key]
270 for i in range(len(self.retrieving[key])):
271 d = self.retrieving[key].pop(0)
272 d.callback(final_result)
273 del self.retrieving[key]
275 def storeValue(self, key, value):
276 """See L{apt_p2p.interfaces.IDHT}."""
277 if self.config is None:
278 return defer.fail(DHTError("configuration not loaded"))
280 return defer.fail(DHTError("have not joined a network yet"))
283 key = self._normKey(key)
284 bvalue = bencode(value)
286 if key in self.storing and bvalue in self.storing[key]:
287 raise DHTError, "already storing that key with the same value"
289 self.khashmir.storeValueForKey(key, bvalue, self._storeValue)
290 self.storing.setdefault(key, {})[bvalue] = d
293 def _storeValue(self, key, bvalue, result):
294 """Process the response from the DHT."""
295 if key in self.storing and bvalue in self.storing[key]:
296 # Check if the store succeeded
298 self.storing[key][bvalue].callback(result)
300 self.storing[key][bvalue].errback(DHTError('could not store value %s in key %s' % (bvalue, key)))
301 del self.storing[key][bvalue]
302 if len(self.storing[key].keys()) == 0:
303 del self.storing[key]
306 """See L{apt_p2p.interfaces.IDHTStats}."""
307 return self.khashmir.getStats()
309 def getStatsFactory(self):
310 """See L{apt_p2p.interfaces.IDHTStatsFactory}."""
311 assert _web2, "NOT IMPLEMENTED: twisted.web2 must be installed to use the stats factory."
312 if self.factory is None:
313 # Create a simple HTTP factory for stats
314 class StatsResource(resource.Resource):
315 def __init__(self, manager):
316 self.manager = manager
317 def render(self, ctx):
318 return http.Response(
320 {'content-type': http_headers.MimeType('text', 'html')},
321 '<html><body>\n\n' + self.manager.getStats() + '\n</body></html>\n')
322 def locateChild(self, request, segments):
323 log.msg('Got HTTP stats request from %s' % (request.remoteAddr, ))
326 self.factory = channel.HTTPFactory(server.Site(StatsResource(self)))
330 class TestSimpleDHT(unittest.TestCase):
331 """Simple 2-node unit tests for the DHT."""
334 DHT_DEFAULTS = {'PORT': 9977,
335 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
336 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
338 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
339 'KEY_EXPIRE': 3600, 'SPEW': False, }
344 self.a.config = self.DHT_DEFAULTS.copy()
345 self.a.config['PORT'] = 4044
346 self.a.bootstrap = ["127.0.0.1:4044"]
347 self.a.bootstrap_node = True
348 self.a.cache_dir = '/tmp'
349 self.b.config = self.DHT_DEFAULTS.copy()
350 self.b.config['PORT'] = 4045
351 self.b.bootstrap = ["127.0.0.1:4044"]
352 self.b.cache_dir = '/tmp'
354 def test_bootstrap_join(self):
358 def no_krpc_errors(self, result):
359 from krpc import KrpcError
360 self.flushLoggedErrors(KrpcError)
363 def test_failed_join(self):
365 reactor.callLater(30, self.a.join)
366 d.addCallback(self.no_krpc_errors)
369 def node_join(self, result):
375 d.addCallback(self.node_join)
378 def test_timeout_retransmit(self):
380 reactor.callLater(4, self.a.join)
383 def test_normKey(self):
384 h = self.a._normKey('12345678901234567890')
385 self.failUnless(h == '12345678901234567890')
386 h = self.a._normKey('12345678901234567')
387 self.failUnless(h == '12345678901234567\000\000\000')
388 h = self.a._normKey('1234567890123456789012345')
389 self.failUnless(h == '12345678901234567890')
390 h = self.a._normKey('1234567890123456789')
391 self.failUnless(h == '1234567890123456789\000')
392 h = self.a._normKey('123456789012345678901')
393 self.failUnless(h == '12345678901234567890')
395 def value_stored(self, result, value):
400 def store_values(self, result):
402 d = self.a.storeValue(sha.new('4045').digest(), str(4045*3))
403 d.addCallback(self.value_stored, 4045)
404 d = self.a.storeValue(sha.new('4044').digest(), str(4044*2))
405 d.addCallback(self.value_stored, 4044)
406 d = self.b.storeValue(sha.new('4045').digest(), str(4045*2))
407 d.addCallback(self.value_stored, 4045)
409 def check_values(self, result, values):
411 self.failUnless(len(result) == len(values))
413 self.failUnless(v in values)
414 if self.checked == 0:
415 self.lastDefer.callback(1)
417 def get_values(self):
419 d = self.a.getValue(sha.new('4044').digest())
420 d.addCallback(self.check_values, [str(4044*2)])
421 d = self.b.getValue(sha.new('4044').digest())
422 d.addCallback(self.check_values, [str(4044*2)])
423 d = self.a.getValue(sha.new('4045').digest())
424 d.addCallback(self.check_values, [str(4045*2), str(4045*3)])
425 d = self.b.getValue(sha.new('4045').digest())
426 d.addCallback(self.check_values, [str(4045*2), str(4045*3)])
428 def test_store(self):
429 from twisted.internet.base import DelayedCall
430 DelayedCall.debug = True
431 self.lastDefer = defer.Deferred()
433 d.addCallback(self.node_join)
434 d.addCallback(self.store_values)
435 return self.lastDefer
440 os.unlink(self.a.khashmir.store.db)
445 os.unlink(self.b.khashmir.store.db)
449 class TestMultiDHT(unittest.TestCase):
450 """More complicated 20-node tests for the DHT."""
454 DHT_DEFAULTS = {'PORT': 9977,
455 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
456 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
458 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
459 'KEY_EXPIRE': 3600, 'SPEW': False, }
463 self.startport = 4081
464 for i in range(self.num):
466 self.l[i].config = self.DHT_DEFAULTS.copy()
467 self.l[i].config['PORT'] = self.startport + i
468 self.l[i].bootstrap = ["127.0.0.1:" + str(self.startport)]
469 self.l[i].cache_dir = '/tmp'
470 self.l[0].bootstrap_node = True
472 def node_join(self, result, next_node):
473 d = self.l[next_node].join()
474 if next_node + 1 < len(self.l):
475 d.addCallback(self.node_join, next_node + 1)
477 d.addCallback(self.lastDefer.callback)
481 self.lastDefer = defer.Deferred()
483 d.addCallback(self.node_join, 1)
484 return self.lastDefer
486 def store_values(self, result, i = 0, j = 0):
493 d = self.l[j].storeValue(sha.new(str(self.startport+i)).digest(), str((self.startport+i)*(j+1)))
494 d.addCallback(self.store_values, i, j+1)
496 def get_values(self, result = None, check = None, i = 0, j = 0):
497 if result is not None:
498 self.failUnless(len(result) == len(check))
500 self.failUnless(v in check)
505 self.lastDefer.callback(1)
507 d = self.l[i].getValue(sha.new(str(self.startport+j)).digest())
509 for k in range(self.startport+j, (self.startport+j)*(j+1)+1, self.startport+j):
511 d.addCallback(self.get_values, check, i, j + random.randrange(1, min(len(self.l), 10)))
513 def store_join(self, result, next_node):
514 d = self.l[next_node].join()
515 if next_node + 1 < len(self.l):
516 d.addCallback(self.store_join, next_node + 1)
518 d.addCallback(self.store_values)
520 def test_store(self):
521 from twisted.internet.base import DelayedCall
522 DelayedCall.debug = True
523 self.lastDefer = defer.Deferred()
525 d.addCallback(self.store_join, 1)
526 return self.lastDefer
532 os.unlink(i.khashmir.store.db)