self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
self.token_secrets = [newID()]
- self.stats = StatsLogger(self.table, self.store, self.config)
+ self.stats = StatsLogger(self.table, self.store)
# Start listening
self.udp = krpc.hostbroker(self, self.stats, config)
(datetime.now() - old.lastSeen) >
timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
- def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
+ def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
"""The pinged node never responded, so replace it."""
- log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
- log.err(err)
+ log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage()))
+ self.stats.completedAction('ping', start)
self.table.replaceStaleNode(oldnode, newnode)
- def _notStaleNodeHandler(dict, old=old, self=self):
+ def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
"""Got a pong from the old node, so update it."""
+ self.stats.completedAction('ping', start)
if dict['id'] == old.id:
self.table.justSeenNode(old.id)
(optional, defaults to calling the callback with None)
"""
- def _pongHandler(dict, node=node, self=self, callback=callback):
+ def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
"""Node responded properly, callback with response."""
n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
- self.insertNode(n)
+ self.stats.completedAction('join', start)
+ reactor.callLater(0, self.insertNode, n)
if callback:
callback((dict['ip_addr'], dict['port']))
- def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
+ def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
"""Error occurred, fail node and errback or callback with error."""
- log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
- log.err(err)
+ log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
+ self.stats.completedAction('join', start)
self.table.nodeFailed(node)
if errback:
errback()
"""
if _krpc_sender is not None:
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted = False)
+ reactor.callLater(0, self.insertNode, n, False)
return {"id" : self.node.id}
"""
if _krpc_sender is not None:
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted = False)
+ reactor.callLater(0, self.insertNode, n, False)
else:
_krpc_sender = ('127.0.0.1', self.port)
"""
if _krpc_sender is not None:
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted = False)
+ reactor.callLater(0, self.insertNode, n, False)
else:
_krpc_sender = ('127.0.0.1', self.port)
"""
if _krpc_sender is not None:
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted = False)
+ reactor.callLater(0, self.insertNode, n, False)
nodes = self.table.findNodes(key)
nodes = map(lambda node: node.contactInfo(), nodes)
"""
if _krpc_sender is not None:
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted = False)
+ reactor.callLater(0, self.insertNode, n, False)
l = self.store.retrieveValues(key)
if num == 0 or num >= len(l):
"""
if _krpc_sender is not None:
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
- self.insertNode(n, contacted = False)
+ reactor.callLater(0, self.insertNode, n, False)
else:
_krpc_sender = ('127.0.0.1', self.port)
class SimpleTests(unittest.TestCase):
timeout = 10
- DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+ DHT_DEFAULTS = {'PORT': 9977,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+ 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
'KEY_EXPIRE': 3600, 'SPEW': False, }
def setUp(self):
timeout = 30
num = 20
- DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+ DHT_DEFAULTS = {'PORT': 9977,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+ 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
'KEY_EXPIRE': 3600, 'SPEW': False, }
def _done(self, val):