]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
unresponsive peers are now purged from the routing table
authorburris <burris>
Sat, 14 Sep 2002 07:26:50 +0000 (07:26 +0000)
committerburris <burris>
Sat, 14 Sep 2002 07:26:50 +0000 (07:26 +0000)
actions.py
const.py
khashmir.py
knode.py
ktable.py
node.py

index 945832970cf3a5467a50b3d1d4211cea3e7648c1..edaa655a89fab9322655e55edb7e2a7a5b84d5b2 100644 (file)
@@ -4,6 +4,7 @@ from pickle import loads, dumps
 from bsddb3 import db
 
 from const import reactor
+import const
 
 from hash import intify
 from knode import KNode as Node
@@ -47,6 +48,7 @@ class FindNode(ActionBase):
     def handleGotNodes(self, args):
        l, sender = args
        sender = Node().initWithDict(sender)
+       self.table.table.insertNode(sender)
        if self.finished or self.answered.has_key(sender.id):
            # a day late and a dollar short
            return
@@ -75,7 +77,7 @@ class FindNode(ActionBase):
            if not self.queried.has_key(node.id) and node.id != self.table.node.id:
                #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
                df = node.findNode(self.target, self.table.node.senderDict())
-               df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
+               df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
                self.outstanding = self.outstanding + 1
                self.queried[node.id] = 1
            if self.outstanding >= N:
@@ -86,12 +88,14 @@ class FindNode(ActionBase):
            self.finished=1
            reactor.callFromThread(self.callback, l[:K])
        
-    def defaultGotNodes(self, t):
-       if self.finished:
-           return
-       self.outstanding = self.outstanding - 1
-       self.schedule()
-       
+    def makeMsgFailed(self, node):
+       def defaultGotNodes(err, self=self, node=node):
+           self.table.table.nodeFailed(node)
+           if self.finished:
+               return
+           self.outstanding = self.outstanding - 1
+           self.schedule()
+       return defaultGotNodes
        
     def goWithNodes(self, nodes):
        """
@@ -104,7 +108,7 @@ class FindNode(ActionBase):
            self.found[node.id] = node
            #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
            df = node.findNode(self.target, self.table.node.senderDict())
-           df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
+           df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
            self.outstanding = self.outstanding + 1
            self.queried[node.id] = 1
        if self.outstanding == 0:
@@ -117,6 +121,7 @@ class GetValue(FindNode):
     def handleGotNodes(self, args):
        l, sender = args
        sender = Node().initWithDict(sender)
+       self.table.table.insertNode(sender)
        if self.finished or self.answered.has_key(sender.id):
            # a day late and a dollar short
            return
@@ -136,6 +141,8 @@ class GetValue(FindNode):
                if not z.has_key(y):
                    z[y] = 1
                    return y
+               else:
+                   return None
            v = filter(None, map(x, l['values']))
            if(len(v)):
                reactor.callFromThread(self.callback, v)
@@ -153,7 +160,7 @@ class GetValue(FindNode):
                #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
                df = node.findValue(self.target, self.table.node.senderDict())
                df.addCallback(self.handleGotNodes)
-               df.addErrback(self.defaultGotNodes)
+               df.addErrback(self.makeMsgFailed(node))
                self.outstanding = self.outstanding + 1
                self.queried[node.id] = 1
            if self.outstanding >= N:
@@ -177,26 +184,23 @@ class GetValue(FindNode):
            #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
            df = node.findValue(self.target, self.table.node.senderDict())
            df.addCallback(self.handleGotNodes)
-           df.addErrback(self.defaultGotNodes)
+           df.addErrback(self.makeMsgFailed(node))
            self.outstanding = self.outstanding + 1
            self.queried[node.id] = 1
        if self.outstanding == 0:
            reactor.callFromThread(self.callback, [])
 
 
-KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours
-KE_DELAY = 60 * 60 # 1 hour
-KE_AGE = KEINITIAL_DELAY
 
 class KeyExpirer:
     def __init__(self, store, itime, kw):
        self.store = store
        self.itime = itime
        self.kw = kw
-       reactor.callLater(KEINITIAL_DELAY, self.doExpire)
+       reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
        
     def doExpire(self):
-       self.cut = `time() - KE_AGE`
+       self.cut = `time() - const.KE_AGE`
        self._expire()
        
     def _expire(self):
@@ -234,7 +238,7 @@ class KeyExpirer:
                    break
            irec = f()
            
-       reactor.callLater(KE_DELAY, self.doExpire)
+       reactor.callLater(const.KE_DELAY, self.doExpire)
        if(i > 0):
            print ">>>KE: done expiring %d" % i
        
\ No newline at end of file
index a36d34309000fe5c0c63c68d1e923f9446aec318..1d3ee621b1bd355319085824adaeca1314077ba3 100644 (file)
--- a/const.py
+++ b/const.py
@@ -2,4 +2,16 @@ from twisted.internet.default import SelectReactor ## twistedmatrix.com
 
 reactor = SelectReactor(installSignalHandlers=0)
 from twisted.internet import main
-main.installReactor(reactor)
\ No newline at end of file
+main.installReactor(reactor)
+
+# how many times a node can fail to respond before it's booted from the routing table
+MAX_FAILURES = 3
+
+# time before expirer starts running
+KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours
+
+# time between expirer runs
+KE_DELAY = 60 * 60 # 1 hour
+
+# expire entries older than this
+KE_AGE = KEINITIAL_DELAY
index 893bd02f6b64f7034af6882cbc6855445325c470..104c9e949211160848889d16899c193457cb0d49 100644 (file)
@@ -100,7 +100,7 @@ class Khashmir(xmlrpc.XMLRPC):
 
        # create our search state
        state = GetValue(self, key, callback)
-       reactor.callFromThread(state.goWithNodes, nodes, {'found' : l})
+       reactor.callFromThread(state.goWithNodes, nodes, l)
        
 
 
@@ -111,16 +111,22 @@ class Khashmir(xmlrpc.XMLRPC):
            values are stored in peers on a first-come first-served basis
            this will probably change so more than one value can be stored under a key
        """
-       def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"):
+       def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
            if not callback:
                # default callback - this will get called for each successful store value
                def _storedValueHandler(sender):
                    pass
                response=_storedValueHandler
+       
            for node in nodes:
+               def cb(t, table = table, node=node, resp=response):
+                   self.table.insertNode(node)
+                   response(t)
                if node.id != self.node.id:
+                   def default(err, node=node, table=table):
+                       table.nodeFailed(node)
                    df = node.storeValue(key, value, self.node.senderDict())
-                   df.addCallbacks(response, default)
+                   df.addCallbacks(cb, default)
        # this call is asynch
        self.findNode(key, _storeValueForKey)
        
@@ -144,10 +150,10 @@ class Khashmir(xmlrpc.XMLRPC):
                self.table.replaceStaleNode(old, newnode)
        
            def _notStaleNodeHandler(sender, old=old):
-               """ called when we get a ping from the remote node """
+               """ called when we get a pong from the old node """
                sender = Node().initWithDict(sender)
                if sender.id == old.id:
-                   self.table.insertNode(old)
+                   self.table.justSeenNode(old)
 
            df = old.ping(self.node.senderDict())
            df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
@@ -170,9 +176,8 @@ class Khashmir(xmlrpc.XMLRPC):
                n = Node().initWithDict(sender)
                table.insertNode(n)
            return
-       def _defaultPong(err):
-           # this should probably increment a failed message counter and dump the node if it gets over a threshold
-           return      
+       def _defaultPong(err, node=node, table=self.table):
+               table.nodeFailed(node)
 
        df.addCallbacks(_pongHandler,_defaultPong)
 
index c9a41ddd3af108db5c22dc6508324eb4404b1cf4..3e32ce83409aebffe04eec98f17b5dec1533e508 100644 (file)
--- a/knode.py
+++ b/knode.py
@@ -4,7 +4,8 @@ from xmlrpcclient import XMLRPCClientFactory as factory
 from const import reactor
 from xmlrpclib import Binary
 
-class KNode(Node):
+
+class KNode(Node):     
     def ping(self, sender):
        df = Deferred()
        f = factory('ping', (sender,), df.callback, df.errback)
index b47416ccf0243656b8d685def7ae1e5fbeda1dac..c34f3630410c44941ee4f27329305001e977481b 100644 (file)
--- a/ktable.py
+++ b/ktable.py
@@ -5,6 +5,7 @@ from bisect import *
 import time
 from types import *
 
+import const
 from node import Node
 
 # The all-powerful, magical Kademlia "k" constant, bucket depth
@@ -104,7 +105,8 @@ class KTable:
            return
 
        del(self.buckets[i].l[it])
-       self.buckets[i].l.append(new)
+       if new:
+           self.buckets[i].l.append(new)
 
     def insertNode(self, node):
        """ 
@@ -163,7 +165,16 @@ class KTable:
            n.updateLastSeen()
            return tstamp
 
-
+    def nodeFailed(self, node):
+       """ call this when a node fails to respond to a message, to invalidate that node """
+       try:
+           n = self.findNodes(node.int)[0]
+       except IndexError:
+           return None
+       else:
+           if(n.msgFailed() >= const.MAX_FAILURES):
+               self.replaceStaleNode(n, None)
+       
 class KBucket:
     __slots = ['min', 'max', 'lastAccessed']
     def __init__(self, contents, min, max):
diff --git a/node.py b/node.py
index 7128ecb493312877ba8e7e0ad1ac7cc807786df9..4d7a977a8ed79947ca1a9d48ea00e184688ec52e 100644 (file)
--- a/node.py
+++ b/node.py
@@ -5,12 +5,16 @@ from xmlrpclib import Binary
 
 class Node:
     """encapsulate contact info"""
+    
+    def __init__(self):
+       self.fails = 0
+       self.lastSeen = time.time()
+
     def init(self, id, host, port):
        self.id = id
        self.int = hash.intify(id)
        self.host = host
        self.port = port
-       self.lastSeen = time.time()
        self._senderDict = {'id': Binary(self.id), 'port' : self.port, 'host' : self.host}
        return self
        
@@ -20,12 +24,16 @@ class Node:
        self.int = hash.intify(self.id)
        self.port = dict['port']
        self.host = dict['host']
-       self.lastSeen = time.time()
        return self
        
     def updateLastSeen(self):
        self.lastSeen = time.time()
-
+       self.fails = 0
+       
+    def msgFailed(self):
+       self.fails = self.fails + 1
+       return self.fails
+       
     def senderDict(self):
        return self._senderDict