From ce14d8699c9306a4f8916277b1511048ac98b7e4 Mon Sep 17 00:00:00 2001 From: burris Date: Sat, 21 Sep 2002 20:43:20 +0000 Subject: [PATCH] much better handling of ip addresses and potentially unreachable peers --- actions.py | 19 ++++++++++++------- khashmir.py | 35 ++++++++++++++++++++--------------- 2 files changed, 32 insertions(+), 22 deletions(-) diff --git a/actions.py b/actions.py index edaa655..e6d5350 100644 --- a/actions.py +++ b/actions.py @@ -10,9 +10,6 @@ from hash import intify from knode import KNode as Node from ktable import KTable, K -# concurrent FIND_NODE/VALUE requests! -N = 3 - class ActionBase: """ base class for some long running asynchronous proccesses like finding nodes or values """ def __init__(self, table, target, callback): @@ -46,7 +43,9 @@ FIND_NODE_TIMEOUT = 15 class FindNode(ActionBase): """ find node action merits it's own class as it is a long running stateful process """ def handleGotNodes(self, args): + args, conn = args l, sender = args + sender['host'] = conn['host'] sender = Node().initWithDict(sender) self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): @@ -58,7 +57,6 @@ class FindNode(ActionBase): n = Node().initWithDict(node) if not self.found.has_key(n.id): self.found[n.id] = n - self.table.insertNode(n) self.schedule() def schedule(self): @@ -80,7 +78,7 @@ class FindNode(ActionBase): df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 - if self.outstanding >= N: + if self.outstanding >= const.CONCURRENT_REQS: break assert(self.outstanding) >=0 if self.outstanding == 0: @@ -111,6 +109,9 @@ class FindNode(ActionBase): df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 + if self.outstanding >= const.CONCURRENT_REQS: + break + if self.outstanding == 0: self.callback(nodes) @@ -119,7 +120,9 @@ GET_VALUE_TIMEOUT = 15 class GetValue(FindNode): """ get value task """ def handleGotNodes(self, args): + args, conn = args l, sender = args + sender['host'] = conn['host'] sender = Node().initWithDict(sender) self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): @@ -134,7 +137,6 @@ class GetValue(FindNode): n = Node().initWithDict(node) if not self.found.has_key(n.id): self.found[n.id] = n - self.table.insertNode(n) elif l.has_key('values'): def x(y, z=self.results): y = y.data @@ -163,7 +165,7 @@ class GetValue(FindNode): df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 - if self.outstanding >= N: + if self.outstanding >= const.CONCURRENT_REQS: break assert(self.outstanding) >=0 if self.outstanding == 0: @@ -187,6 +189,9 @@ class GetValue(FindNode): df.addErrback(self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 + if self.outstanding >= const.CONCURRENT_REQS: + break + if self.outstanding == 0: reactor.callFromThread(self.callback, []) diff --git a/khashmir.py b/khashmir.py index 1722ad4..ca6c3d0 100644 --- a/khashmir.py +++ b/khashmir.py @@ -1,6 +1,8 @@ ## Copyright 2002 Andrew Loewenstern, All Rights Reserved from const import reactor +import const + import time from pickle import loads, dumps from sha import sha @@ -23,9 +25,6 @@ from bsddb3._db import DBNotFoundError from xmlrpclib import Binary -# don't ping unless it's been at least this many seconds since we've heard from a peer -MAX_PING_INTERVAL = 60 * 15 # fifteen minutes - # this is the main class! @@ -131,7 +130,7 @@ class Khashmir(xmlrpc.XMLRPC): self.findNode(key, _storeValueForKey) - def insertNode(self, n): + def insertNode(self, n, contacted=1): """ insert a node in our local table, pinging oldest contact in bucket, if necessary @@ -140,8 +139,8 @@ class Khashmir(xmlrpc.XMLRPC): a node into the table without it's peer-ID. That means of course the node passed into this method needs to be a properly formed Node object with a valid ID. """ - old = self.table.insertNode(n) - if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id: + old = self.table.insertNode(n, contacted=contacted) + if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: # the bucket is full, check to see if old node is still around and if so, replace it ## these are the callbacks used when we ping the oldest node in a bucket @@ -151,6 +150,8 @@ class Khashmir(xmlrpc.XMLRPC): def _notStaleNodeHandler(sender, old=old): """ called when we get a pong from the old node """ + sender, conn = sender + sender['host'] = conn['host'] sender = Node().initWithDict(sender) if sender.id == old.id: self.table.justSeenNode(old) @@ -166,11 +167,11 @@ class Khashmir(xmlrpc.XMLRPC): df = node.ping(self.node.senderDict()) ## these are the callbacks we use when we issue a PING def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table): + sender = sender[0] if id != 20 * ' ' and id != sender['id'].data: # whoah, got response from different peer than we were expecting pass else: - #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port) sender['host'] = host sender['port'] = port n = Node().initWithDict(sender) @@ -230,7 +231,7 @@ class Khashmir(xmlrpc.XMLRPC): ip = self.crequest.getClientIP() sender['host'] = ip n = Node().initWithDict(sender) - self.insertNode(n) + self.insertNode(n, contacted=0) return self.node.senderDict() def xmlrpc_find_node(self, target, sender): @@ -239,7 +240,7 @@ class Khashmir(xmlrpc.XMLRPC): ip = self.crequest.getClientIP() sender['host'] = ip n = Node().initWithDict(sender) - self.insertNode(n) + self.insertNode(n, contacted=0) return nodes, self.node.senderDict() def xmlrpc_store_value(self, key, value, sender): @@ -260,7 +261,7 @@ class Khashmir(xmlrpc.XMLRPC): ip = self.crequest.getClientIP() sender['host'] = ip n = Node().initWithDict(sender) - self.insertNode(n) + self.insertNode(n, contacted=0) return self.node.senderDict() def xmlrpc_find_value(self, key, sender): @@ -268,7 +269,7 @@ class Khashmir(xmlrpc.XMLRPC): key = key.data sender['host'] = ip n = Node().initWithDict(sender) - self.insertNode(n) + self.insertNode(n, contacted=0) l = self.retrieveValues(key) if len(l) > 0: @@ -384,7 +385,7 @@ def test_find_value(l, quiet=0): try: if(len(values) == 0): if not self.found: - print "find FAILED" + print "find NOT FOUND" else: print "find FOUND" sys.stdout.flush() @@ -402,14 +403,18 @@ def test_find_value(l, quiet=0): d.valueForKey(key, cb(fc).callback) fc.wait() -def test_one(port): +def test_one(host, port): import thread - k = Khashmir('localhost', port) + k = Khashmir(host, port) thread.start_new_thread(k.app.run, ()) return k if __name__ == "__main__": - l = test_build_net() + import sys + n = 8 + if len(sys.argv) > 1: + n = int(sys.argv[1]) + l = test_build_net(peers=n) time.sleep(3) print "finding nodes..." for i in range(10): -- 2.39.2