From: Cameron Dale Date: Sat, 10 May 2008 00:18:20 +0000 (-0700) Subject: When a node fails, schedule a future ping to check again. X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=commitdiff_plain;h=cebe4a5c92702346c939e370cc169ee781358b8d When a node fails, schedule a future ping to check again. Should help to eliminate bad nodes faster. To facilitate, added a sendPing() to Khashmir. insertNode now uses sendPing() rather than it's own routines. Pinging is now stateful, so that multiple pings are not sent simultaneously. --- diff --git a/apt_p2p_Khashmir/DHT.py b/apt_p2p_Khashmir/DHT.py index 1236ebc..ddcaa7a 100644 --- a/apt_p2p_Khashmir/DHT.py +++ b/apt_p2p_Khashmir/DHT.py @@ -477,7 +477,7 @@ class TestMultiDHT(unittest.TestCase): if next_node + 1 < len(self.l): d.addCallback(self.node_join, next_node + 1) else: - d.addCallback(self.lastDefer.callback) + reactor.callLater(1, d.addCallback, self.lastDefer.callback) def test_join(self): self.timeout = 2 diff --git a/apt_p2p_Khashmir/actions.py b/apt_p2p_Khashmir/actions.py index 865561c..c49959f 100644 --- a/apt_p2p_Khashmir/actions.py +++ b/apt_p2p_Khashmir/actions.py @@ -207,7 +207,7 @@ class ActionBase: """Receive an error from a remote node.""" log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage())) if node.id != self.caller.node.id: - self.caller.table.nodeFailed(node) + self.caller.nodeFailed(node) self.failed[node.id] = 1 if self.outstanding.has_key(node.id): self.outstanding_results -= self.outstanding[node.id] diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index 968ddeb..607c49b 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -15,6 +15,7 @@ from copy import copy import os, re from twisted.internet.defer import Deferred +from twisted.internet.base import DelayedCall from twisted.internet import protocol, reactor from twisted.python import log from twisted.trial import unittest @@ -41,6 +42,9 @@ class KhashmirBase(protocol.Factory): @ivar _Node: the knode implementation to use for this class of DHT @type config: C{dictionary} @ivar config: the configuration parameters for the DHT + @type pinging: C{dictionary} + @ivar pinging: the node's that are currently being pinged, keys are the + node id's, values are the Deferred or DelayedCall objects @type port: C{int} @ivar port: the port to listen on @type store: L{db.DB} @@ -73,6 +77,7 @@ class KhashmirBase(protocol.Factory): (optional, defaults to the /tmp directory) """ self.config = None + self.pinging = {} self.setup(config, cache_dir) def setup(self, config, cache_dir): @@ -217,47 +222,69 @@ class KhashmirBase(protocol.Factory): timedelta(seconds=self.config['MIN_PING_INTERVAL'])): # Bucket is full, check to see if old node is still available - self.stats.startedAction('ping') - df = old.ping(self.node.id) - df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler, - callbackArgs = (old, datetime.now()), - errbackArgs = (old, datetime.now(), node, contacted)) + df = self.sendPing(old) + df.addErrback(self._staleNodeHandler, old, node, contacted) elif not old and not contacted: # There's room, we just need to contact the node first - self.stats.startedAction('ping') - df = node.ping(self.node.id) - # Convert the returned contact info into a node - df.addCallback(self._pongHandler, datetime.now()) - # Try adding the contacted node - df.addCallbacks(self.insertNode, self._pongError, - errbackArgs = (node, datetime.now())) - - def _freshNodeHandler(self, dict, old, start): - """Got a pong from the old node, so update it.""" - self.stats.completedAction('ping', start) - if dict['id'] == old.id: - self.table.justSeenNode(old.id) - - def _staleNodeHandler(self, err, old, start, node, contacted): + self.sendPing(node) + + def _staleNodeHandler(self, err, old, node, contacted): """The pinged node never responded, so replace it.""" - log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage())) - self.stats.completedAction('ping', start) self.table.invalidateNode(old) self.insertNode(node, contacted) + return err + + def nodeFailed(self, node): + """Mark a node as having failed a request and schedule a future check. + + @type node: L{node.Node} + @param node: the new node to try and insert + """ + exists = self.table.nodeFailed(node) + + # If in the table, schedule a ping, if one isn't already sent/scheduled + if exists and node.id not in self.pinging: + self.pinging[node,id] = reactor.callLater(self.config['MIN_PING_INTERVAL'], + self.sendPing, node) - def _pongHandler(self, dict, start): - """Node responded properly, change response into a node to insert.""" + def sendPing(self, node): + """Ping the node to see if it's still alive. + + @type node: L{node.Node} + @param node: the node to send the join to + """ + # Check for a ping already underway + if (isinstance(self.pinging.get(node.id, None), DelayedCall) and + self.pinging[node.id].active()): + self.pinging[node.id].cancel() + elif isinstance(self.pinging.get(node.id, None), Deferred): + return self.pinging[node.id] + + self.stats.startedAction('ping') + df = node.ping(self.node.id) + self.pinging[node.id] = df + df.addCallbacks(self._pingHandler, self._pingError, + callbackArgs = (node, datetime.now()), + errbackArgs = (node, datetime.now())) + return df + + def _pingHandler(self, dict, node, start): + """Node responded properly, update it and return the node object.""" self.stats.completedAction('ping', start) + del self.pinging[node.id] # Create the node using the returned contact info n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) + reactor.callLater(0, self.insertNode, n) return n - def _pongError(self, err, node, start): - """Error occurred, fail node and errback or callback with error.""" + def _pingError(self, err, node, start): + """Error occurred, fail node.""" log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage())) self.stats.completedAction('ping', start) - self.table.nodeFailed(node) - + del self.pinging[node.id] + self.nodeFailed(node) + return err + def sendJoin(self, node, callback=None, errback=None): """Join the DHT by pinging a bootstrap node. @@ -293,7 +320,7 @@ class KhashmirBase(protocol.Factory): """Error occurred, fail node.""" log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage())) self.stats.completedAction('join', start) - self.table.nodeFailed(node) + self.nodeFailed(node) return err def findCloseNodes(self, callback=lambda a: None): @@ -334,6 +361,9 @@ class KhashmirBase(protocol.Factory): self.next_checkpoint.cancel() except: pass + for call in self.pinging: + if isinstance(call, DelayedCall) and call.active(): + call.cancel() self.store.close() def getStats(self): @@ -598,6 +628,10 @@ class SimpleTests(unittest.TestCase): reactor.iterate() reactor.iterate() reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() self.failUnlessEqual(len(self.a.table.buckets), 1) self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1) @@ -626,6 +660,20 @@ class SimpleTests(unittest.TestCase): reactor.iterate() reactor.iterate() reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() def _cb(self, key, val): if not val: @@ -666,6 +714,9 @@ class MultiTest(unittest.TestCase): reactor.iterate() reactor.iterate() reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() for i in self.l: self.done = 0 diff --git a/apt_p2p_Khashmir/ktable.py b/apt_p2p_Khashmir/ktable.py index 199c626..201d494 100644 --- a/apt_p2p_Khashmir/ktable.py +++ b/apt_p2p_Khashmir/ktable.py @@ -262,7 +262,10 @@ class KTable: self.replaceStaleNode(n) def nodeFailed(self, node): - """Mark a node as having failed once, and remove it if it has failed too much.""" + """Mark a node as having failed once, and remove it if it has failed too much. + + @return: whether the node is in the routing table + """ # Get the bucket number num = self._nodeNum(node) i = self._bucketIndexForInt(num) @@ -271,11 +274,13 @@ class KTable: try: n = self.buckets[i].node(num) except ValueError: - return None + return False else: # The node is in the bucket if n.msgFailed() >= self.config['MAX_FAILURES']: self.invalidateNode(n) + return False + return True class KBucket: """Single bucket of nodes in a kademlia-like routing table.