import os, re
from twisted.internet.defer import Deferred
+from twisted.internet.base import DelayedCall
from twisted.internet import protocol, reactor
from twisted.python import log
from twisted.trial import unittest
@ivar _Node: the knode implementation to use for this class of DHT
@type config: C{dictionary}
@ivar config: the configuration parameters for the DHT
+ @type pinging: C{dictionary}
+ @ivar pinging: the node's that are currently being pinged, keys are the
+ node id's, values are the Deferred or DelayedCall objects
@type port: C{int}
@ivar port: the port to listen on
@type store: L{db.DB}
(optional, defaults to the /tmp directory)
"""
self.config = None
+ self.pinging = {}
self.setup(config, cache_dir)
def setup(self, config, cache_dir):
timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
# Bucket is full, check to see if old node is still available
- self.stats.startedAction('ping')
- df = old.ping(self.node.id)
- df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
- callbackArgs = (old, datetime.now()),
- errbackArgs = (old, datetime.now(), node, contacted))
+ df = self.sendPing(old)
+ df.addErrback(self._staleNodeHandler, old, node, contacted)
elif not old and not contacted:
# There's room, we just need to contact the node first
- self.stats.startedAction('ping')
- df = node.ping(self.node.id)
- # Convert the returned contact info into a node
- df.addCallback(self._pongHandler, datetime.now())
- # Try adding the contacted node
- df.addCallbacks(self.insertNode, self._pongError,
- errbackArgs = (node, datetime.now()))
-
- def _freshNodeHandler(self, dict, old, start):
- """Got a pong from the old node, so update it."""
- self.stats.completedAction('ping', start)
- if dict['id'] == old.id:
- self.table.justSeenNode(old.id)
-
- def _staleNodeHandler(self, err, old, start, node, contacted):
+ self.sendPing(node)
+
+ def _staleNodeHandler(self, err, old, node, contacted):
"""The pinged node never responded, so replace it."""
- log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
- self.stats.completedAction('ping', start)
self.table.invalidateNode(old)
self.insertNode(node, contacted)
+ return err
+
+ def nodeFailed(self, node):
+ """Mark a node as having failed a request and schedule a future check.
+
+ @type node: L{node.Node}
+ @param node: the new node to try and insert
+ """
+ exists = self.table.nodeFailed(node)
+
+ # If in the table, schedule a ping, if one isn't already sent/scheduled
+ if exists and node.id not in self.pinging:
+ self.pinging[node,id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
+ self.sendPing, node)
- def _pongHandler(self, dict, start):
- """Node responded properly, change response into a node to insert."""
+ def sendPing(self, node):
+ """Ping the node to see if it's still alive.
+
+ @type node: L{node.Node}
+ @param node: the node to send the join to
+ """
+ # Check for a ping already underway
+ if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
+ self.pinging[node.id].active()):
+ self.pinging[node.id].cancel()
+ elif isinstance(self.pinging.get(node.id, None), Deferred):
+ return self.pinging[node.id]
+
+ self.stats.startedAction('ping')
+ df = node.ping(self.node.id)
+ self.pinging[node.id] = df
+ df.addCallbacks(self._pingHandler, self._pingError,
+ callbackArgs = (node, datetime.now()),
+ errbackArgs = (node, datetime.now()))
+ return df
+
+ def _pingHandler(self, dict, node, start):
+ """Node responded properly, update it and return the node object."""
self.stats.completedAction('ping', start)
+ del self.pinging[node.id]
# Create the node using the returned contact info
n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
+ reactor.callLater(0, self.insertNode, n)
return n
- def _pongError(self, err, node, start):
- """Error occurred, fail node and errback or callback with error."""
+ def _pingError(self, err, node, start):
+ """Error occurred, fail node."""
log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
self.stats.completedAction('ping', start)
- self.table.nodeFailed(node)
-
+ del self.pinging[node.id]
+ self.nodeFailed(node)
+ return err
+
def sendJoin(self, node, callback=None, errback=None):
"""Join the DHT by pinging a bootstrap node.
"""Error occurred, fail node."""
log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
self.stats.completedAction('join', start)
- self.table.nodeFailed(node)
+ self.nodeFailed(node)
return err
def findCloseNodes(self, callback=lambda a: None):
self.next_checkpoint.cancel()
except:
pass
+ for call in self.pinging:
+ if isinstance(call, DelayedCall) and call.active():
+ call.cancel()
self.store.close()
def getStats(self):
reactor.iterate()
reactor.iterate()
reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
self.failUnlessEqual(len(self.a.table.buckets), 1)
self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
reactor.iterate()
reactor.iterate()
reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
def _cb(self, key, val):
if not val:
reactor.iterate()
reactor.iterate()
reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
for i in self.l:
self.done = 0