+from datetime import datetime
import os, sha, random
from twisted.internet import defer, reactor
from twisted.internet.abstract import isIPAddress
+from twisted.python import log
from twisted.trial import unittest
from zope.interface import implements
self.bootstrap_node = False
self.joining = None
self.joined = False
+ self.outstandingJoins = 0
+ self.foundAddrs = []
self.storing = {}
self.retrieving = {}
self.retrieved = {}
elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL',
'BUCKET_STALENESS', 'KEINITIAL_DELAY', 'KE_DELAY', 'KE_AGE']:
self.config[k] = self.config_parser.gettime(section, k)
+ elif k in ['SPEW']:
+ self.config[k] = self.config_parser.getboolean(section, k)
else:
self.config[k] = self.config_parser.get(section, k)
def _join_gotIP(self, ip, port):
"""Called after an IP address has been found for a single bootstrap node."""
- self.khashmir.addContact(ip, port, self._join_single)
+ self.outstandingJoins += 1
+ self.khashmir.addContact(ip, port, self._join_single, self._join_error)
- def _join_single(self):
+ def _join_single(self, addr):
"""Called when a single bootstrap node has been added."""
- self.khashmir.findCloseNodes(self._join_complete)
+ self.outstandingJoins -= 1
+ if addr:
+ self.foundAddrs.append(addr)
+ if addr or self.outstandingJoins <= 0:
+ self.khashmir.findCloseNodes(self._join_complete, self._join_complete)
+ log.msg('Got back from bootstrap node: %r' % (addr,))
+ def _join_error(self, failure = None):
+ """Called when a single bootstrap node has failed."""
+ self.outstandingJoins -= 1
+ log.msg("bootstrap node could not be reached")
+ if self.outstandingJoins <= 0:
+ self.khashmir.findCloseNodes(self._join_complete, self._join_complete)
+
def _join_complete(self, result):
"""Called when the tables have been initialized with nodes."""
- if not self.joined:
+ if not self.joined and len(result) > 0:
self.joined = True
- if len(result) > 0 or self.bootstrap_node:
- df = self.joining
- self.joining = None
- df.callback(result)
+ if self.joining and self.outstandingJoins <= 0:
+ df = self.joining
+ self.joining = None
+ if self.joined or self.bootstrap_node:
+ self.joined = True
+ df.callback(self.foundAddrs)
else:
- df = self.joining
- self.joining = None
df.errback(DHTError('could not find any nodes to bootstrap to'))
+ def getAddrs(self):
+ return self.foundAddrs
+
def leave(self):
"""See L{apt_dht.interfaces.IDHT}."""
if self.config is None:
self.joined = False
self.khashmir.shutdown()
- def normalizeKey(self, key):
- """Normalize a key's length suitable for insertion in the DHT."""
- key_bytes = (self.config['HASH_LENGTH'] - 1) // 8 + 1
- if len(key) < key_bytes:
- key = key + '\000'*(key_bytes - len(key))
- elif len(key) > key_bytes:
- key = key[:key_bytes]
- return key
-
def getValue(self, key):
"""See L{apt_dht.interfaces.IDHT}."""
if self.config is None:
raise DHTError, "have not joined a network yet"
d = defer.Deferred()
- key = self.normalizeKey(key)
if key not in self.retrieving:
self.khashmir.valueForKey(key, self._getValue)
self.retrieving.setdefault(key, []).append(d)
d.callback(final_result)
del self.retrieving[key]
- def storeValue(self, key, value):
+ def storeValue(self, key, value, originated = None):
"""See L{apt_dht.interfaces.IDHT}."""
if self.config is None:
raise DHTError, "configuration not loaded"
if not self.joined:
raise DHTError, "have not joined a network yet"
- key = self.normalizeKey(key)
if key in self.storing and value in self.storing[key]:
raise DHTError, "already storing that key with the same value"
+ if originated is None:
+ originated = datetime.utcnow()
d = defer.Deferred()
- self.khashmir.storeValueForKey(key, value, self._storeValue)
+ self.khashmir.storeValueForKey(key, value, originated, self._storeValue)
self.storing.setdefault(key, {})[value] = d
return d
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, }
+ 'KE_AGE': 3600, 'SPEW': True, }
def setUp(self):
self.a = DHT()
self.b.bootstrap = ["127.0.0.1:4044"]
self.b.cache_dir = '/tmp'
- def test_normalizeKey(self):
- self.failUnless(self.a.normalizeKey('12345678901234567890') == '12345678901234567890')
- self.failUnless(self.a.normalizeKey('12345678901234567') == '12345678901234567\000\000\000')
- self.failUnless(self.a.normalizeKey('1234567890123456789012345') == '12345678901234567890')
- self.failUnless(self.a.normalizeKey('1234567890123456789') == '1234567890123456789\000')
- self.failUnless(self.a.normalizeKey('123456789012345678901') == '12345678901234567890')
-
def test_bootstrap_join(self):
d = self.a.join()
return d
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, }
+ 'KE_AGE': 3600, 'SPEW': False, }
def setUp(self):
self.l = []