From 194d18a4f4f1c615eecc7d93f379497e0eb586cd Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Mon, 28 Apr 2008 17:44:45 -0700 Subject: [PATCH] Only add nodes to the routing table that have responded to a request. If they haven't, ping them before adding them. Still ping the old node first to see if it's stale. Move the temporary functions for ping/join to be module functions. --- apt_p2p_Khashmir/DHT.py | 4 +- apt_p2p_Khashmir/khashmir.py | 111 ++++++++++++++++++++++------------- apt_p2p_Khashmir/ktable.py | 22 ++++--- 3 files changed, 86 insertions(+), 51 deletions(-) diff --git a/apt_p2p_Khashmir/DHT.py b/apt_p2p_Khashmir/DHT.py index af99f05..cc3660c 100644 --- a/apt_p2p_Khashmir/DHT.py +++ b/apt_p2p_Khashmir/DHT.py @@ -338,7 +338,7 @@ class TestSimpleDHT(unittest.TestCase): 'MAX_FAILURES': 3, 'LOCAL_OK': True, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, - 'KEY_EXPIRE': 3600, 'SPEW': False, } + 'KEY_EXPIRE': 3600, 'SPEW': True, } def setUp(self): self.a = DHT() @@ -459,7 +459,7 @@ class TestMultiDHT(unittest.TestCase): 'MAX_FAILURES': 3, 'LOCAL_OK': True, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, - 'KEY_EXPIRE': 3600, 'SPEW': False, } + 'KEY_EXPIRE': 3600, 'SPEW': True, } def setUp(self): self.l = [] diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index 294940c..408a7e9 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -164,7 +164,7 @@ class KhashmirBase(protocol.Factory): (optional, defaults to doing nothing with the results) @type errback: C{method} @param errback: the method to call if an error occurs - (optional, defaults to calling the callback with None) + (optional, defaults to calling the callback with the error) """ n = self.Node(NULL_ID, host, port) self.sendJoin(n, callback=callback, errback=errback) @@ -190,7 +190,7 @@ class KhashmirBase(protocol.Factory): If all you have is a host/port, then use L{addContact}, which calls this method after receiving the PONG from the remote node. The reason for - the seperation is we can't insert a node into the table without its + the separation is we can't insert a node into the table without its node ID. That means of course the node passed into this method needs to be a properly formed Node object with a valid ID. @@ -202,30 +202,57 @@ class KhashmirBase(protocol.Factory): """ # Don't add any local nodes to the routing table if not self.config['LOCAL_OK'] and isLocal.match(node.host): + log.msg('Not adding local node to table: %s/%s' % (node.host, node.port)) return - + old = self.table.insertNode(node, contacted=contacted) - if (old and old.id != self.node.id and + + if (isinstance(old, self._Node) and old.id != self.node.id and (datetime.now() - old.lastSeen) > timedelta(seconds=self.config['MIN_PING_INTERVAL'])): - def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()): - """The pinged node never responded, so replace it.""" - log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage())) - self.stats.completedAction('ping', start) - self.table.replaceStaleNode(oldnode, newnode) - - def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()): - """Got a pong from the old node, so update it.""" - self.stats.completedAction('ping', start) - if dict['id'] == old.id: - self.table.justSeenNode(old.id) - # Bucket is full, check to see if old node is still available self.stats.startedAction('ping') df = old.ping(self.node.id) - df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) + df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler, + callbackArgs = (old, datetime.now()), + errbackArgs = (old, datetime.now(), node, contacted)) + elif not old and not contacted: + # There's room, we just need to contact the node first + self.stats.startedAction('ping') + df = node.ping(self.node.id) + # Convert the returned contact info into a node + df.addCallback(self._pongHandler, datetime.now()) + # Try adding the contacted node + df.addCallbacks(self.insertNode, self._pongError, + errbackArgs = (node, datetime.now())) + + def _freshNodeHandler(self, dict, old, start): + """Got a pong from the old node, so update it.""" + self.stats.completedAction('ping', start) + if dict['id'] == old.id: + self.table.justSeenNode(old.id) + + def _staleNodeHandler(self, err, old, start, node, contacted): + """The pinged node never responded, so replace it.""" + log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage())) + self.stats.completedAction('ping', start) + self.table.invalidateNode(old) + self.insertNode(node, contacted) + + def _pongHandler(self, dict, start): + """Node responded properly, change response into a node to insert.""" + self.stats.completedAction('ping', start) + # Create the node using the returned contact info + n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) + return n + def _pongError(self, err, node, start): + """Error occurred, fail node and errback or callback with error.""" + log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage())) + self.stats.completedAction('ping', start) + self.table.nodeFailed(node) + def sendJoin(self, node, callback=None, errback=None): """Join the DHT by pinging a bootstrap node. @@ -237,31 +264,33 @@ class KhashmirBase(protocol.Factory): (optional, defaults to doing nothing with the results) @type errback: C{method} @param errback: the method to call if an error occurs - (optional, defaults to calling the callback with None) + (optional, defaults to calling the callback with the error) """ - - def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()): - """Node responded properly, callback with response.""" - n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) - self.stats.completedAction('join', start) - reactor.callLater(0, self.insertNode, n) - if callback: - callback((dict['ip_addr'], dict['port'])) - - def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()): - """Error occurred, fail node and errback or callback with error.""" - log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage())) - self.stats.completedAction('join', start) - self.table.nodeFailed(node) - if errback: - errback() - elif callback: - callback(None) - + if errback is None: + errback = callback self.stats.startedAction('join') df = node.join(self.node.id) - df.addCallbacks(_pongHandler, _defaultPong) - + df.addCallbacks(self._joinHandler, self._joinError, + callbackArgs = (node, datetime.now()), + errbackArgs = (node, datetime.now())) + if callback: + df.addCallbacks(callback, errback) + + def _joinHandler(self, dict, node, start): + """Node responded properly, extract the response.""" + self.stats.completedAction('join', start) + # Create the node using the returned contact info + n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) + reactor.callLater(0, self.insertNode, n) + return (dict['ip_addr'], dict['port']) + + def _joinError(self, err, node, start): + """Error occurred, fail node.""" + log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage())) + self.stats.completedAction('join', start) + self.table.nodeFailed(node) + return err + def findCloseNodes(self, callback=lambda a: None): """Perform a findNode on the ID one away from our own. @@ -533,7 +562,7 @@ class SimpleTests(unittest.TestCase): 'MAX_FAILURES': 3, 'LOCAL_OK': True, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, - 'KEY_EXPIRE': 3600, 'SPEW': False, } + 'KEY_EXPIRE': 3600, 'SPEW': True, } def setUp(self): d = self.DHT_DEFAULTS.copy() @@ -607,7 +636,7 @@ class MultiTest(unittest.TestCase): 'MAX_FAILURES': 3, 'LOCAL_OK': True, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, - 'KEY_EXPIRE': 3600, 'SPEW': False, } + 'KEY_EXPIRE': 3600, 'SPEW': True, } def _done(self, val): self.done = 1 diff --git a/apt_p2p_Khashmir/ktable.py b/apt_p2p_Khashmir/ktable.py index 007ff1c..e91c0db 100644 --- a/apt_p2p_Khashmir/ktable.py +++ b/apt_p2p_Khashmir/ktable.py @@ -173,6 +173,7 @@ class KTable: # Remove the stale node del(self.buckets[i].l[it]) removed = True + log.msg('Removed node from routing table: %s/%s' % (stale.host, stale.port)) # Insert the new node if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < K: @@ -183,7 +184,8 @@ class KTable: def insertNode(self, node, contacted = True): """Try to insert a node in the routing table. - This inserts the node, returning None if successful, otherwise returns + This inserts the node, returning True if successful, False if the + node could have been added if it responds to a ping, otherwise returns the oldest node in the bucket if it's full. The caller is then responsible for pinging the returned node and calling replaceStaleNode if it doesn't respond. contacted means that yes, we contacted THEM and @@ -194,11 +196,13 @@ class KTable: @type contacted: C{boolean} @param contacted: whether the new node is known to be good, i.e. responded to a request (optional, defaults to True) - @rtype: L{node.Node} - @return: None if successful (the bucket wasn't full), otherwise returns the oldest node in the bucket + @rtype: L{node.Node} or C{boolean} + @return: True if successful (the bucket wasn't full), False if the + node could have been added if it was contacted, otherwise + returns the oldest node in the bucket """ assert node.id != NULL_ID - if node.id == self.node.id: return + if node.id == self.node.id: return True # Get the bucket for this node i = self._bucketIndexForInt(node.num) @@ -219,16 +223,18 @@ class KTable: # utilizing this nodes new contact info self.buckets[i].l.append(node) self.buckets[i].touch() - return + return True # We don't have this node, check to see if the bucket is full if len(self.buckets[i].l) < K: # Not full, append this node and return if contacted: node.updateLastSeen() - self.buckets[i].l.append(node) - self.buckets[i].touch() - return + self.buckets[i].l.append(node) + self.buckets[i].touch() + log.msg('Added node to routing table: %s/%s' % (node.host, node.port)) + return True + return False # Bucket is full, check to see if the local node is not in the bucket if not (self.buckets[i].min <= self.node < self.buckets[i].max): -- 2.39.5