From 6533b37c36b8437d4d1fc43714c0cdbc1767eca7 Mon Sep 17 00:00:00 2001 From: burris Date: Sat, 14 Sep 2002 07:26:50 +0000 Subject: [PATCH] unresponsive peers are now purged from the routing table --- actions.py | 36 ++++++++++++++++++++---------------- const.py | 14 +++++++++++++- khashmir.py | 21 +++++++++++++-------- knode.py | 3 ++- ktable.py | 15 +++++++++++++-- node.py | 14 +++++++++++--- 6 files changed, 72 insertions(+), 31 deletions(-) diff --git a/actions.py b/actions.py index 9458329..edaa655 100644 --- a/actions.py +++ b/actions.py @@ -4,6 +4,7 @@ from pickle import loads, dumps from bsddb3 import db from const import reactor +import const from hash import intify from knode import KNode as Node @@ -47,6 +48,7 @@ class FindNode(ActionBase): def handleGotNodes(self, args): l, sender = args sender = Node().initWithDict(sender) + self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short return @@ -75,7 +77,7 @@ class FindNode(ActionBase): if not self.queried.has_key(node.id) and node.id != self.table.node.id: #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) + df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding >= N: @@ -86,12 +88,14 @@ class FindNode(ActionBase): self.finished=1 reactor.callFromThread(self.callback, l[:K]) - def defaultGotNodes(self, t): - if self.finished: - return - self.outstanding = self.outstanding - 1 - self.schedule() - + def makeMsgFailed(self, node): + def defaultGotNodes(err, self=self, node=node): + self.table.table.nodeFailed(node) + if self.finished: + return + self.outstanding = self.outstanding - 1 + self.schedule() + return defaultGotNodes def goWithNodes(self, nodes): """ @@ -104,7 +108,7 @@ class FindNode(ActionBase): self.found[node.id] = node #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) + df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding == 0: @@ -117,6 +121,7 @@ class GetValue(FindNode): def handleGotNodes(self, args): l, sender = args sender = Node().initWithDict(sender) + self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short return @@ -136,6 +141,8 @@ class GetValue(FindNode): if not z.has_key(y): z[y] = 1 return y + else: + return None v = filter(None, map(x, l['values'])) if(len(v)): reactor.callFromThread(self.callback, v) @@ -153,7 +160,7 @@ class GetValue(FindNode): #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT df = node.findValue(self.target, self.table.node.senderDict()) df.addCallback(self.handleGotNodes) - df.addErrback(self.defaultGotNodes) + df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding >= N: @@ -177,26 +184,23 @@ class GetValue(FindNode): #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT df = node.findValue(self.target, self.table.node.senderDict()) df.addCallback(self.handleGotNodes) - df.addErrback(self.defaultGotNodes) + df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 if self.outstanding == 0: reactor.callFromThread(self.callback, []) -KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours -KE_DELAY = 60 * 60 # 1 hour -KE_AGE = KEINITIAL_DELAY class KeyExpirer: def __init__(self, store, itime, kw): self.store = store self.itime = itime self.kw = kw - reactor.callLater(KEINITIAL_DELAY, self.doExpire) + reactor.callLater(const.KEINITIAL_DELAY, self.doExpire) def doExpire(self): - self.cut = `time() - KE_AGE` + self.cut = `time() - const.KE_AGE` self._expire() def _expire(self): @@ -234,7 +238,7 @@ class KeyExpirer: break irec = f() - reactor.callLater(KE_DELAY, self.doExpire) + reactor.callLater(const.KE_DELAY, self.doExpire) if(i > 0): print ">>>KE: done expiring %d" % i \ No newline at end of file diff --git a/const.py b/const.py index a36d343..1d3ee62 100644 --- a/const.py +++ b/const.py @@ -2,4 +2,16 @@ from twisted.internet.default import SelectReactor ## twistedmatrix.com reactor = SelectReactor(installSignalHandlers=0) from twisted.internet import main -main.installReactor(reactor) \ No newline at end of file +main.installReactor(reactor) + +# how many times a node can fail to respond before it's booted from the routing table +MAX_FAILURES = 3 + +# time before expirer starts running +KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours + +# time between expirer runs +KE_DELAY = 60 * 60 # 1 hour + +# expire entries older than this +KE_AGE = KEINITIAL_DELAY diff --git a/khashmir.py b/khashmir.py index 893bd02..104c9e9 100644 --- a/khashmir.py +++ b/khashmir.py @@ -100,7 +100,7 @@ class Khashmir(xmlrpc.XMLRPC): # create our search state state = GetValue(self, key, callback) - reactor.callFromThread(state.goWithNodes, nodes, {'found' : l}) + reactor.callFromThread(state.goWithNodes, nodes, l) @@ -111,16 +111,22 @@ class Khashmir(xmlrpc.XMLRPC): values are stored in peers on a first-come first-served basis this will probably change so more than one value can be stored under a key """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"): + def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): if not callback: # default callback - this will get called for each successful store value def _storedValueHandler(sender): pass response=_storedValueHandler + for node in nodes: + def cb(t, table = table, node=node, resp=response): + self.table.insertNode(node) + response(t) if node.id != self.node.id: + def default(err, node=node, table=table): + table.nodeFailed(node) df = node.storeValue(key, value, self.node.senderDict()) - df.addCallbacks(response, default) + df.addCallbacks(cb, default) # this call is asynch self.findNode(key, _storeValueForKey) @@ -144,10 +150,10 @@ class Khashmir(xmlrpc.XMLRPC): self.table.replaceStaleNode(old, newnode) def _notStaleNodeHandler(sender, old=old): - """ called when we get a ping from the remote node """ + """ called when we get a pong from the old node """ sender = Node().initWithDict(sender) if sender.id == old.id: - self.table.insertNode(old) + self.table.justSeenNode(old) df = old.ping(self.node.senderDict()) df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) @@ -170,9 +176,8 @@ class Khashmir(xmlrpc.XMLRPC): n = Node().initWithDict(sender) table.insertNode(n) return - def _defaultPong(err): - # this should probably increment a failed message counter and dump the node if it gets over a threshold - return + def _defaultPong(err, node=node, table=self.table): + table.nodeFailed(node) df.addCallbacks(_pongHandler,_defaultPong) diff --git a/knode.py b/knode.py index c9a41dd..3e32ce8 100644 --- a/knode.py +++ b/knode.py @@ -4,7 +4,8 @@ from xmlrpcclient import XMLRPCClientFactory as factory from const import reactor from xmlrpclib import Binary -class KNode(Node): + +class KNode(Node): def ping(self, sender): df = Deferred() f = factory('ping', (sender,), df.callback, df.errback) diff --git a/ktable.py b/ktable.py index b47416c..c34f363 100644 --- a/ktable.py +++ b/ktable.py @@ -5,6 +5,7 @@ from bisect import * import time from types import * +import const from node import Node # The all-powerful, magical Kademlia "k" constant, bucket depth @@ -104,7 +105,8 @@ class KTable: return del(self.buckets[i].l[it]) - self.buckets[i].l.append(new) + if new: + self.buckets[i].l.append(new) def insertNode(self, node): """ @@ -163,7 +165,16 @@ class KTable: n.updateLastSeen() return tstamp - + def nodeFailed(self, node): + """ call this when a node fails to respond to a message, to invalidate that node """ + try: + n = self.findNodes(node.int)[0] + except IndexError: + return None + else: + if(n.msgFailed() >= const.MAX_FAILURES): + self.replaceStaleNode(n, None) + class KBucket: __slots = ['min', 'max', 'lastAccessed'] def __init__(self, contents, min, max): diff --git a/node.py b/node.py index 7128ecb..4d7a977 100644 --- a/node.py +++ b/node.py @@ -5,12 +5,16 @@ from xmlrpclib import Binary class Node: """encapsulate contact info""" + + def __init__(self): + self.fails = 0 + self.lastSeen = time.time() + def init(self, id, host, port): self.id = id self.int = hash.intify(id) self.host = host self.port = port - self.lastSeen = time.time() self._senderDict = {'id': Binary(self.id), 'port' : self.port, 'host' : self.host} return self @@ -20,12 +24,16 @@ class Node: self.int = hash.intify(self.id) self.port = dict['port'] self.host = dict['host'] - self.lastSeen = time.time() return self def updateLastSeen(self): self.lastSeen = time.time() - + self.fails = 0 + + def msgFailed(self): + self.fails = self.fails + 1 + return self.fails + def senderDict(self): return self._senderDict -- 2.39.5