self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
self.token_secrets = [newID()]
- self.stats = StatsLogger(self.table, self.store, self.config)
+ self.stats = StatsLogger(self.table, self.store)
# Start listening
self.udp = krpc.hostbroker(self, self.stats, config)
n = self.Node(NULL_ID, host, port)
self.sendJoin(n, callback=callback, errback=errback)
- def findNode(self, id, callback, errback=None):
+ def findNode(self, id, callback):
"""Find the contact info for the K closest nodes in the global table.
@type id: C{string}
@type callback: C{method}
@param callback: the method to call with the results, it must take 1
parameter, the list of K closest nodes
- @type errback: C{method}
- @param errback: the method to call if an error occurs
- (optional, defaults to doing nothing when an error occurs)
"""
- # Get K nodes out of local table/cache
- nodes = self.table.findNodes(id)
- nodes = [copy(node) for node in nodes]
- d = Deferred()
- if errback:
- d.addCallbacks(callback, errback)
- else:
- d.addCallback(callback)
+ # Start with our node
+ nodes = [copy(self.node)]
- # If the target ID was found
- if len(nodes) == 1 and nodes[0].id == id:
- d.callback(nodes)
- else:
- # Start the finding nodes action
- state = FindNode(self, id, d.callback, self.config, self.stats)
- reactor.callLater(0, state.goWithNodes, nodes)
+ # Start the finding nodes action
+ state = FindNode(self, id, callback, self.config, self.stats)
+ reactor.callLater(0, state.goWithNodes, nodes)
def insertNode(self, node, contacted = True):
"""Try to insert a node in our local table, pinging oldest contact if necessary.
(datetime.now() - old.lastSeen) >
timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
- def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
+ def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
"""The pinged node never responded, so replace it."""
log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
log.err(err)
+ self.stats.completedAction('ping', start)
self.table.replaceStaleNode(oldnode, newnode)
- def _notStaleNodeHandler(dict, old=old, self=self):
+ def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
"""Got a pong from the old node, so update it."""
+ self.stats.completedAction('ping', start)
if dict['id'] == old.id:
self.table.justSeenNode(old.id)
(optional, defaults to calling the callback with None)
"""
- def _pongHandler(dict, node=node, self=self, callback=callback):
+ def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
"""Node responded properly, callback with response."""
n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+ self.stats.completedAction('join', start)
self.insertNode(n)
if callback:
callback((dict['ip_addr'], dict['port']))
- def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
+ def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
"""Error occurred, fail node and errback or callback with error."""
log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
log.err(err)
+ self.stats.completedAction('join', start)
self.table.nodeFailed(node)
if errback:
errback()
df = node.join(self.node.id)
df.addCallbacks(_pongHandler, _defaultPong)
- def findCloseNodes(self, callback=lambda a: None, errback = None):
+ def findCloseNodes(self, callback=lambda a: None):
"""Perform a findNode on the ID one away from our own.
This will allow us to populate our table with nodes on our network
@param callback: the method to call with the results, it must take 1
parameter, the list of K closest nodes
(optional, defaults to doing nothing with the results)
- @type errback: C{method}
- @param errback: the method to call if an error occurs
- (optional, defaults to doing nothing when an error occurs)
"""
id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
- self.findNode(id, callback, errback)
+ self.findNode(id, callback)
def refreshTable(self, force = False):
"""Check all the buckets for those that need refreshing.
_Node = KNodeRead
#{ Local interface
- def findValue(self, key, callback, errback=None):
+ def findValue(self, key, callback):
"""Get the nodes that have values for the key from the global table.
@type key: C{string}
@type callback: C{method}
@param callback: the method to call with the results, it must take 1
parameter, the list of nodes with values
- @type errback: C{method}
- @param errback: the method to call if an error occurs
- (optional, defaults to doing nothing when an error occurs)
"""
- # Get K nodes out of local table/cache
- nodes = self.table.findNodes(key)
- nodes = [copy(node) for node in nodes]
- d = Deferred()
- if errback:
- d.addCallbacks(callback, errback)
- else:
- d.addCallback(callback)
-
+ # Start with ourself
+ nodes = [copy(self.node)]
+
# Search for others starting with the locally found ones
- state = FindValue(self, key, d.callback, self.config, self.stats)
+ state = FindValue(self, key, callback, self.config, self.stats)
reactor.callLater(0, state.goWithNodes, nodes)
def valueForKey(self, key, callback, searchlocal = True):
@type searchlocal: C{boolean}
@param searchlocal: whether to also look for any local values
"""
- # Get any local values
- if searchlocal:
- l = self.store.retrieveValues(key)
- if len(l) > 0:
- reactor.callLater(0, callback, key, l)
- else:
- l = []
- def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
+ def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
"""Use the found nodes to send requests for values to."""
- state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
+ # Get any local values
+ if searchlocal:
+ l = self.store.retrieveValues(key)
+ if len(l) > 0:
+ node = copy(self.node)
+ node.updateNumValues(len(l))
+ nodes = nodes + [node]
+
+ state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
reactor.callLater(0, state.goWithNodes, nodes)
# First lookup nodes that have values for the key
class SimpleTests(unittest.TestCase):
timeout = 10
- DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+ DHT_DEFAULTS = {'PORT': 9977,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+ 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
'KEY_EXPIRE': 3600, 'SPEW': False, }
def setUp(self):
timeout = 30
num = 20
- DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
+ DHT_DEFAULTS = {'PORT': 9977,
'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+ 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
'KEY_EXPIRE': 3600, 'SPEW': False, }
def _done(self, val):