From bcfb1df73281c19eeeb87f31e498bb1b09780ba6 Mon Sep 17 00:00:00 2001 From: burris Date: Sat, 22 Feb 2003 06:55:43 +0000 Subject: [PATCH] *** empty log message *** --- actions.py | 4 ++-- airhook.py | 45 ++++++++++++++++++++++++++++++++------------- const.py | 2 +- hash.py | 4 ++-- khashmir.py | 10 ++++------ krpc.py | 33 +++++++++++++++++++++------------ test.py | 4 ++-- test_krpc.py | 38 +++++++++++++++++++++++++++++++++++++- 8 files changed, 101 insertions(+), 39 deletions(-) diff --git a/actions.py b/actions.py index 18b9cdd..f5d3a9f 100644 --- a/actions.py +++ b/actions.py @@ -88,7 +88,7 @@ class FindNode(ActionBase): def makeMsgFailed(self, node): def defaultGotNodes(err, self=self, node=node): - print ">>> find failed" + print ">>> find failed %s/%s" % (node.host, node.port) self.table.table.nodeFailed(node) self.outstanding = self.outstanding - 1 self.schedule() @@ -203,7 +203,7 @@ class StoreValue(ActionBase): self.schedule() def storeFailed(self, t, node): - print ">>> store failed" + print ">>> store failed %s/%s" % (node.host, node.port) self.table.nodeFailed(node) self.outstanding -= 1 if self.finished: diff --git a/airhook.py b/airhook.py index d3ae57a..dc17dca 100644 --- a/airhook.py +++ b/airhook.py @@ -41,6 +41,8 @@ class Airhook(protocol.DatagramProtocol): self.connectionForAddr(addr).datagramReceived(datagram) def connectionForAddr(self, addr): + if addr == self.addr: + raise Exception if not self.connections.has_key(addr): conn = self.connection() conn.protocol = self.factory.buildProtocol(addr) @@ -51,10 +53,10 @@ class Airhook(protocol.DatagramProtocol): else: conn = self.connections[addr] return conn -# def makeConnection(self, transport): -# protocol.DatagramProtocol.makeConnection(self, transport) -# tup = transport.getHost() -# self.addr = (tup[1], tup[2]) + def makeConnection(self, transport): + protocol.DatagramProtocol.makeConnection(self, transport) + tup = transport.getHost() + self.addr = (tup[1], tup[2]) class AirhookPacket: def __init__(self, msg): @@ -93,18 +95,14 @@ class AirhookPacket: class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConnectedTransport): def __init__(self): self.outSeq = 0 # highest sequence we have sent, can't be 255 more than obSeq - self.obSeq = 0 # highest sequence confirmed by remote - self.inSeq = 0 # last received sequence self.observed = None # their session id self.sessionID = long(rand(0, 2**32)) # our session id self.lastTransmit = 0 # time we last sent a packet with messages - self.lastReceieved = 0 # time we last received a packet with messages + self.lastReceived = 0 # time we last received a packet with messages self.lastTransmitSeq = -1 # last sequence we sent a packet self.state = pending # one of pending, sent, confirmed - self.omsgq = [] # list of messages to go out - self.imsgq = [] # list of messages coming in self.sendSession = None # send session/observed fields until obSeq > sendSession self.response = 0 # if we know we have a response now (like resending missed packets) self.noisy = 0 @@ -117,6 +115,10 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne self.outMsgNums = [0] * 256 # outgoing message numbers i = outNum % 256 self.next = 0 # next outgoing message number self.scheduled = 0 # a sendNext is scheduled, don't schedule another + self.omsgq = [] # list of messages to go out + self.imsgq = [] # list of messages coming in + self.obSeq = 0 # highest sequence confirmed by remote + self.inSeq = 0 # last received sequence def datagramReceived(self, datagram): if not datagram: @@ -132,12 +134,15 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne if p.observed == self.sessionID: self.observed = p.session self.state = confirmed + self.response = 1 else: - # bogus packet! - return + self.observed = p.session + self.response = 1 elif p.session != None: self.observed = p.session self.response = 1 + else: + self.response = 1 elif self.state == sent: if p.observed != None and p.session != None: if p.observed == self.sessionID: @@ -149,8 +154,18 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne self.observed = p.session elif self.observed != p.session: self.state = pending + self.observed = p.session self.resetConnection() + self.response = 1 + if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection): + self.protocol.resetConnection() self.inSeq = p.seq + self.schedule() + return + elif p.session == None and p.observed == None: + self.response = 1 + self.schedule() + elif self.state == confirmed: if p.session != None or p.observed != None : if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID): @@ -160,7 +175,8 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne self.inSeq = p.seq if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection): self.protocol.resetConnection() - + self.schedule() + return # check to make sure sequence number isn't out of order if (p.seq - self.inSeq) % 2**16 >= 256: return @@ -357,7 +373,10 @@ class StreamConnection(AirhookConnection): def resetConnection(self): AirhookConnection.resetConnection(self) self.resetStream() - + + def loseConnection(self): + pass + def dataCameIn(self): # put 'em together for msg in self.imsgq: diff --git a/const.py b/const.py index 797b2d0..1faab28 100644 --- a/const.py +++ b/const.py @@ -16,7 +16,7 @@ except IOError: NULL_ID = 20 * '\0' # Kademlia "K" constant, this should be an even number -K = 20 +K = 8 # SHA1 is 160 bits long HASH_LENGTH = 160 diff --git a/hash.py b/hash.py index 2a31246..6ac97cb 100644 --- a/hash.py +++ b/hash.py @@ -3,7 +3,6 @@ from sha import sha import whrandom -random = open('/dev/urandom', 'r') # sucks for windoze def intify(hstr): """20 bit hash, big-endian -> long python integer""" @@ -28,7 +27,8 @@ def distance(a, b): def newID(): """returns a new pseudorandom globally unique ID string""" h = sha() - h.update(random.read(20)) + for i in range(20): + h.update(chr(whrandom.randint(0,255))) return h.digest() def newIDInRange(min, max): diff --git a/khashmir.py b/khashmir.py index a738a97..b987856 100644 --- a/khashmir.py +++ b/khashmir.py @@ -353,11 +353,11 @@ from sha import sha from hash import newID -def test_net(peers=24, startport=2001, dbprefix='/tmp/test'): +def test_net(host='127.0.0.1', peers=24, startport=2001, dbprefix='/tmp/test'): import thread l = [] for i in xrange(peers): - a = Khashmir('127.0.0.1', startport + i, db = dbprefix+`i`) + a = Khashmir(host, startport + i, db = dbprefix+`i`) l.append(a) thread.start_new_thread(l[0].app.run, ()) for peer in l[1:]: @@ -479,17 +479,15 @@ def test_find_value(l, quiet=0): self.found = 0 self.port = port def callback(self, values): - try: if(len(values) == 0): if not self.found: - print "find %s NOT FOUND" % self.port + print "find %s NOT FOUND" % self.port else: print "find %s FOUND" % self.port + self.flag.set() else: if self.val in values: self.found = 1 - finally: - self.flag.set() b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0) fa.wait() diff --git a/krpc.py b/krpc.py index b2700e1..12fb4d3 100644 --- a/krpc.py +++ b/krpc.py @@ -7,7 +7,7 @@ import time import hash -KRPC_TIMEOUT = 30 +KRPC_TIMEOUT = 60 KRPC_ERROR = 1 KRPC_ERROR_METHOD_UNKNOWN = 2 @@ -19,6 +19,12 @@ class KRPC(basic.NetstringReceiver): def __init__(self): self.tids = {} + + def dataRecieved(self, data): + basic.NetstringReceiver(self, data) + if self.brokenPeer: + self.resetConnection() + def resetConnection(self): self.brokenPeer = 0 self._readerState = basic.LENGTH @@ -45,26 +51,26 @@ class KRPC(basic.NetstringReceiver): ret = apply(f, (), msg['arg']) except Exception, e: ## send error - str = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`}) - olen = len(str) - self.sendString(str) + out = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`}) + olen = len(out) + self.sendString(out) else: if ret: # make response - str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret}) + out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret}) else: - str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}}) + out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}}) # send response - olen = len(str) - self.sendString(str) + olen = len(out) + self.sendString(out) else: if self.noisy: print "don't know about method %s" % msg['req'] # unknown method - str = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN}) - olen = len(str) - self.sendString(str) + out = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN}) + olen = len(out) + self.sendString(out) if self.noisy: print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.transport.addr, self.factory.node.port, ilen, msg['req'], olen) @@ -76,7 +82,9 @@ class KRPC(basic.NetstringReceiver): # callback del(self.tids[msg['tid']]) df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr}) - # no tid, this transaction timed out already... + else: + print 'timeout ' + `msg['rsp']['sender']` + # no tid, this transaction timed out already... elif msg['typ'] == 'err': # if error # lookup tid @@ -85,6 +93,7 @@ class KRPC(basic.NetstringReceiver): df.errback(msg['err']) del(self.tids[msg['tid']]) else: + print "unknown message type " + `msg` # unknown message type df = self.tids[msg['tid']] # callback diff --git a/test.py b/test.py index 0dd3f41..4cb499e 100644 --- a/test.py +++ b/test.py @@ -5,6 +5,6 @@ import hash, node, knode import actions import btemplate import test_airhook - -tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'knode', 'actions', 'ktable', 'test_airhook']) +import test_krpc +tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'knode', 'actions', 'ktable', 'test_airhook', 'test_krpc']) result = unittest.TextTestRunner().run(tests) diff --git a/test_krpc.py b/test_krpc.py index 40c6f8d..147b8bb 100644 --- a/test_krpc.py +++ b/test_krpc.py @@ -91,7 +91,6 @@ class EchoTest(TestCase): self.b = listenAirhookStream(4043, self.bf) def testEcho(self): - self.noisy = 1 df = self.a.connectionForAddr(('127.0.0.1', 4043)).protocol.sendRequest('echo', {'msg' : "This is a test."}) df.addCallback(self.gotMsg) reactor.iterate() @@ -105,6 +104,39 @@ class EchoTest(TestCase): msg = dict['rsp'] self.msg = msg +class ManyEchoTest(TestCase): + def setUp(self): + self.noisy = 0 + self.msg = None + + self.af = Receiver() + self.bf = Receiver() + self.a = listenAirhookStream(4588, self.af) + self.b = listenAirhookStream(4589, self.bf) + + def testManyEcho(self): + df = self.a.connectionForAddr(('127.0.0.1', 4589)).protocol.sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is a test.") + for i in xrange(100): + self.msg = None + df = self.a.connectionForAddr(('127.0.0.1', 4589)).protocol.sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is a test.") + + def gotMsg(self, dict): + _krpc_sender = dict['_krpc_sender'] + msg = dict['rsp'] + self.msg = msg + class MultiEchoTest(TestCase): def setUp(self): self.noisy = 0 @@ -174,6 +206,7 @@ class EchoResetTest(TestCase): reactor.iterate() self.assertEqual(self.msg, "This is another test.") + del(self.a.connections[('127.0.0.1', 4079)]) df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is yet another test."}) df.addCallback(self.gotMsg) reactor.iterate() @@ -182,6 +215,9 @@ class EchoResetTest(TestCase): reactor.iterate() self.assertEqual(self.msg, "This is yet another test.") + def testLotsofEchoReset(self): + for i in range(100): + self.testEchoReset() def gotMsg(self, dict): _krpc_sender = dict['_krpc_sender'] msg = dict['rsp'] -- 2.39.5