(optional, defaults to doing nothing with the results)
@type errback: C{method}
@param errback: the method to call if an error occurs
- (optional, defaults to calling the callback with None)
+ (optional, defaults to calling the callback with the error)
"""
n = self.Node(NULL_ID, host, port)
self.sendJoin(n, callback=callback, errback=errback)
If all you have is a host/port, then use L{addContact}, which calls this
method after receiving the PONG from the remote node. The reason for
- the seperation is we can't insert a node into the table without its
+ the separation is we can't insert a node into the table without its
node ID. That means of course the node passed into this method needs
to be a properly formed Node object with a valid ID.
"""
# Don't add any local nodes to the routing table
if not self.config['LOCAL_OK'] and isLocal.match(node.host):
+ log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
return
-
+
old = self.table.insertNode(node, contacted=contacted)
- if (old and old.id != self.node.id and
+
+ if (isinstance(old, self._Node) and old.id != self.node.id and
(datetime.now() - old.lastSeen) >
timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
- def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
- """The pinged node never responded, so replace it."""
- log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage()))
- self.stats.completedAction('ping', start)
- self.table.replaceStaleNode(oldnode, newnode)
-
- def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
- """Got a pong from the old node, so update it."""
- self.stats.completedAction('ping', start)
- if dict['id'] == old.id:
- self.table.justSeenNode(old.id)
-
# Bucket is full, check to see if old node is still available
self.stats.startedAction('ping')
df = old.ping(self.node.id)
- df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
+ df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
+ callbackArgs = (old, datetime.now()),
+ errbackArgs = (old, datetime.now(), node, contacted))
+ elif not old and not contacted:
+ # There's room, we just need to contact the node first
+ self.stats.startedAction('ping')
+ df = node.ping(self.node.id)
+ # Convert the returned contact info into a node
+ df.addCallback(self._pongHandler, datetime.now())
+ # Try adding the contacted node
+ df.addCallbacks(self.insertNode, self._pongError,
+ errbackArgs = (node, datetime.now()))
+
+ def _freshNodeHandler(self, dict, old, start):
+ """Got a pong from the old node, so update it."""
+ self.stats.completedAction('ping', start)
+ if dict['id'] == old.id:
+ self.table.justSeenNode(old.id)
+
+ def _staleNodeHandler(self, err, old, start, node, contacted):
+ """The pinged node never responded, so replace it."""
+ log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
+ self.stats.completedAction('ping', start)
+ self.table.invalidateNode(old)
+ self.insertNode(node, contacted)
+
+ def _pongHandler(self, dict, start):
+ """Node responded properly, change response into a node to insert."""
+ self.stats.completedAction('ping', start)
+ # Create the node using the returned contact info
+ n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+ return n
+ def _pongError(self, err, node, start):
+ """Error occurred, fail node and errback or callback with error."""
+ log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
+ self.stats.completedAction('ping', start)
+ self.table.nodeFailed(node)
+
def sendJoin(self, node, callback=None, errback=None):
"""Join the DHT by pinging a bootstrap node.
(optional, defaults to doing nothing with the results)
@type errback: C{method}
@param errback: the method to call if an error occurs
- (optional, defaults to calling the callback with None)
+ (optional, defaults to calling the callback with the error)
"""
-
- def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
- """Node responded properly, callback with response."""
- n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
- self.stats.completedAction('join', start)
- reactor.callLater(0, self.insertNode, n)
- if callback:
- callback((dict['ip_addr'], dict['port']))
-
- def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
- """Error occurred, fail node and errback or callback with error."""
- log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
- self.stats.completedAction('join', start)
- self.table.nodeFailed(node)
- if errback:
- errback()
- elif callback:
- callback(None)
-
+ if errback is None:
+ errback = callback
self.stats.startedAction('join')
df = node.join(self.node.id)
- df.addCallbacks(_pongHandler, _defaultPong)
-
+ df.addCallbacks(self._joinHandler, self._joinError,
+ callbackArgs = (node, datetime.now()),
+ errbackArgs = (node, datetime.now()))
+ if callback:
+ df.addCallbacks(callback, errback)
+
+ def _joinHandler(self, dict, node, start):
+ """Node responded properly, extract the response."""
+ self.stats.completedAction('join', start)
+ # Create the node using the returned contact info
+ n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+ reactor.callLater(0, self.insertNode, n)
+ return (dict['ip_addr'], dict['port'])
+
+ def _joinError(self, err, node, start):
+ """Error occurred, fail node."""
+ log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
+ self.stats.completedAction('join', start)
+ self.table.nodeFailed(node)
+ return err
+
def findCloseNodes(self, callback=lambda a: None):
"""Perform a findNode on the ID one away from our own.
'MAX_FAILURES': 3, 'LOCAL_OK': True,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
- 'KEY_EXPIRE': 3600, 'SPEW': False, }
+ 'KEY_EXPIRE': 3600, 'SPEW': True, }
def setUp(self):
d = self.DHT_DEFAULTS.copy()
'MAX_FAILURES': 3, 'LOCAL_OK': True,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
- 'KEY_EXPIRE': 3600, 'SPEW': False, }
+ 'KEY_EXPIRE': 3600, 'SPEW': True, }
def _done(self, val):
self.done = 1
# Remove the stale node
del(self.buckets[i].l[it])
removed = True
+ log.msg('Removed node from routing table: %s/%s' % (stale.host, stale.port))
# Insert the new node
if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < K:
def insertNode(self, node, contacted = True):
"""Try to insert a node in the routing table.
- This inserts the node, returning None if successful, otherwise returns
+ This inserts the node, returning True if successful, False if the
+ node could have been added if it responds to a ping, otherwise returns
the oldest node in the bucket if it's full. The caller is then
responsible for pinging the returned node and calling replaceStaleNode
if it doesn't respond. contacted means that yes, we contacted THEM and
@type contacted: C{boolean}
@param contacted: whether the new node is known to be good, i.e.
responded to a request (optional, defaults to True)
- @rtype: L{node.Node}
- @return: None if successful (the bucket wasn't full), otherwise returns the oldest node in the bucket
+ @rtype: L{node.Node} or C{boolean}
+ @return: True if successful (the bucket wasn't full), False if the
+ node could have been added if it was contacted, otherwise
+ returns the oldest node in the bucket
"""
assert node.id != NULL_ID
- if node.id == self.node.id: return
+ if node.id == self.node.id: return True
# Get the bucket for this node
i = self._bucketIndexForInt(node.num)
# utilizing this nodes new contact info
self.buckets[i].l.append(node)
self.buckets[i].touch()
- return
+ return True
# We don't have this node, check to see if the bucket is full
if len(self.buckets[i].l) < K:
# Not full, append this node and return
if contacted:
node.updateLastSeen()
- self.buckets[i].l.append(node)
- self.buckets[i].touch()
- return
+ self.buckets[i].l.append(node)
+ self.buckets[i].touch()
+ log.msg('Added node to routing table: %s/%s' % (node.host, node.port))
+ return True
+ return False
# Bucket is full, check to see if the local node is not in the bucket
if not (self.buckets[i].min <= self.node < self.buckets[i].max):