]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
much better handling of ip addresses and potentially unreachable peers
authorburris <burris>
Sat, 21 Sep 2002 20:43:20 +0000 (20:43 +0000)
committerburris <burris>
Sat, 21 Sep 2002 20:43:20 +0000 (20:43 +0000)
actions.py
khashmir.py

index edaa655a89fab9322655e55edb7e2a7a5b84d5b2..e6d53507202964e1c8c654821aa7dba18ac0225e 100644 (file)
@@ -10,9 +10,6 @@ from hash import intify
 from knode import KNode as Node
 from ktable import KTable, K
 
-# concurrent FIND_NODE/VALUE requests!
-N = 3
-
 class ActionBase:
     """ base class for some long running asynchronous proccesses like finding nodes or values """
     def __init__(self, table, target, callback):
@@ -46,7 +43,9 @@ FIND_NODE_TIMEOUT = 15
 class FindNode(ActionBase):
     """ find node action merits it's own class as it is a long running stateful process """
     def handleGotNodes(self, args):
+       args, conn = args
        l, sender = args
+       sender['host'] = conn['host']
        sender = Node().initWithDict(sender)
        self.table.table.insertNode(sender)
        if self.finished or self.answered.has_key(sender.id):
@@ -58,7 +57,6 @@ class FindNode(ActionBase):
            n = Node().initWithDict(node)
            if not self.found.has_key(n.id):
                self.found[n.id] = n
-               self.table.insertNode(n)
        self.schedule()
                
     def schedule(self):
@@ -80,7 +78,7 @@ class FindNode(ActionBase):
                df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
                self.outstanding = self.outstanding + 1
                self.queried[node.id] = 1
-           if self.outstanding >= N:
+           if self.outstanding >= const.CONCURRENT_REQS:
                break
        assert(self.outstanding) >=0
        if self.outstanding == 0:
@@ -111,6 +109,9 @@ class FindNode(ActionBase):
            df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
            self.outstanding = self.outstanding + 1
            self.queried[node.id] = 1
+           if self.outstanding >= const.CONCURRENT_REQS:
+               break
+
        if self.outstanding == 0:
            self.callback(nodes)
 
@@ -119,7 +120,9 @@ GET_VALUE_TIMEOUT = 15
 class GetValue(FindNode):
     """ get value task """
     def handleGotNodes(self, args):
+       args, conn = args
        l, sender = args
+       sender['host'] = conn['host']
        sender = Node().initWithDict(sender)
        self.table.table.insertNode(sender)
        if self.finished or self.answered.has_key(sender.id):
@@ -134,7 +137,6 @@ class GetValue(FindNode):
                n = Node().initWithDict(node)
                if not self.found.has_key(n.id):
                    self.found[n.id] = n
-                   self.table.insertNode(n)
        elif l.has_key('values'):
            def x(y, z=self.results):
                y = y.data
@@ -163,7 +165,7 @@ class GetValue(FindNode):
                df.addErrback(self.makeMsgFailed(node))
                self.outstanding = self.outstanding + 1
                self.queried[node.id] = 1
-           if self.outstanding >= N:
+           if self.outstanding >= const.CONCURRENT_REQS:
                break
        assert(self.outstanding) >=0
        if self.outstanding == 0:
@@ -187,6 +189,9 @@ class GetValue(FindNode):
            df.addErrback(self.makeMsgFailed(node))
            self.outstanding = self.outstanding + 1
            self.queried[node.id] = 1
+           if self.outstanding >= const.CONCURRENT_REQS:
+               break
+
        if self.outstanding == 0:
            reactor.callFromThread(self.callback, [])
 
index 1722ad48ad9291a7375302b0035e45019ca297ed..ca6c3d0d1d5cd7c15e0b56f84b127615503d308b 100644 (file)
@@ -1,6 +1,8 @@
 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
 
 from const import reactor
+import const
+
 import time
 from pickle import loads, dumps
 from sha import sha
@@ -23,9 +25,6 @@ from bsddb3._db import DBNotFoundError
 
 from xmlrpclib import Binary
 
-# don't ping unless it's been at least this many seconds since we've heard from a peer
-MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
-
 
 
 # this is the main class!
@@ -131,7 +130,7 @@ class Khashmir(xmlrpc.XMLRPC):
        self.findNode(key, _storeValueForKey)
        
        
-    def insertNode(self, n):
+    def insertNode(self, n, contacted=1):
        """
        insert a node in our local table, pinging oldest contact in bucket, if necessary
        
@@ -140,8 +139,8 @@ class Khashmir(xmlrpc.XMLRPC):
        a node into the table without it's peer-ID.  That means of course the node passed into this
        method needs to be a properly formed Node object with a valid ID.
        """
-       old = self.table.insertNode(n)
-       if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
+       old = self.table.insertNode(n, contacted=contacted)
+       if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
            # the bucket is full, check to see if old node is still around and if so, replace it
            
            ## these are the callbacks used when we ping the oldest node in a bucket
@@ -151,6 +150,8 @@ class Khashmir(xmlrpc.XMLRPC):
        
            def _notStaleNodeHandler(sender, old=old):
                """ called when we get a pong from the old node """
+               sender, conn = sender
+               sender['host'] = conn['host']
                sender = Node().initWithDict(sender)
                if sender.id == old.id:
                    self.table.justSeenNode(old)
@@ -166,11 +167,11 @@ class Khashmir(xmlrpc.XMLRPC):
        df = node.ping(self.node.senderDict())
        ## these are the callbacks we use when we issue a PING
        def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
+           sender = sender[0]
            if id != 20 * ' ' and id != sender['id'].data:
                # whoah, got response from different peer than we were expecting
                pass
            else:
-               #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
                sender['host'] = host
                sender['port'] = port
                n = Node().initWithDict(sender)
@@ -230,7 +231,7 @@ class Khashmir(xmlrpc.XMLRPC):
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
-       self.insertNode(n)
+       self.insertNode(n, contacted=0)
        return self.node.senderDict()
                
     def xmlrpc_find_node(self, target, sender):
@@ -239,7 +240,7 @@ class Khashmir(xmlrpc.XMLRPC):
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
-       self.insertNode(n)
+       self.insertNode(n, contacted=0)
        return nodes, self.node.senderDict()
     
     def xmlrpc_store_value(self, key, value, sender):
@@ -260,7 +261,7 @@ class Khashmir(xmlrpc.XMLRPC):
        ip = self.crequest.getClientIP()
        sender['host'] = ip
        n = Node().initWithDict(sender)
-       self.insertNode(n)
+       self.insertNode(n, contacted=0)
        return self.node.senderDict()
        
     def xmlrpc_find_value(self, key, sender):
@@ -268,7 +269,7 @@ class Khashmir(xmlrpc.XMLRPC):
        key = key.data
        sender['host'] = ip
        n = Node().initWithDict(sender)
-       self.insertNode(n)
+       self.insertNode(n, contacted=0)
 
        l = self.retrieveValues(key)
        if len(l) > 0:
@@ -384,7 +385,7 @@ def test_find_value(l, quiet=0):
            try:
                if(len(values) == 0):
                    if not self.found:
-                       print "find                FAILED"
+                       print "find                NOT FOUND"
                    else:
                        print "find                FOUND"
                    sys.stdout.flush()
@@ -402,14 +403,18 @@ def test_find_value(l, quiet=0):
     d.valueForKey(key, cb(fc).callback)    
     fc.wait()
     
-def test_one(port):
+def test_one(host, port):
     import thread
-    k = Khashmir('localhost', port)
+    k = Khashmir(host, port)
     thread.start_new_thread(k.app.run, ())
     return k
     
 if __name__ == "__main__":
-    l = test_build_net()
+    import sys
+    n = 8
+    if len(sys.argv) > 1:
+       n = int(sys.argv[1])
+    l = test_build_net(peers=n)
     time.sleep(3)
     print "finding nodes..."
     for i in range(10):