def makeMsgFailed(self, node):
def defaultGotNodes(err, self=self, node=node):
- print ">>> find failed"
+ print ">>> find failed %s/%s" % (node.host, node.port)
self.table.table.nodeFailed(node)
self.outstanding = self.outstanding - 1
self.schedule()
self.schedule()
def storeFailed(self, t, node):
- print ">>> store failed"
+ print ">>> store failed %s/%s" % (node.host, node.port)
self.table.nodeFailed(node)
self.outstanding -= 1
if self.finished:
self.connectionForAddr(addr).datagramReceived(datagram)
def connectionForAddr(self, addr):
+ if addr == self.addr:
+ raise Exception
if not self.connections.has_key(addr):
conn = self.connection()
conn.protocol = self.factory.buildProtocol(addr)
else:
conn = self.connections[addr]
return conn
-# def makeConnection(self, transport):
-# protocol.DatagramProtocol.makeConnection(self, transport)
-# tup = transport.getHost()
-# self.addr = (tup[1], tup[2])
+ def makeConnection(self, transport):
+ protocol.DatagramProtocol.makeConnection(self, transport)
+ tup = transport.getHost()
+ self.addr = (tup[1], tup[2])
class AirhookPacket:
def __init__(self, msg):
class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConnectedTransport):
def __init__(self):
self.outSeq = 0 # highest sequence we have sent, can't be 255 more than obSeq
- self.obSeq = 0 # highest sequence confirmed by remote
- self.inSeq = 0 # last received sequence
self.observed = None # their session id
self.sessionID = long(rand(0, 2**32)) # our session id
self.lastTransmit = 0 # time we last sent a packet with messages
- self.lastReceieved = 0 # time we last received a packet with messages
+ self.lastReceived = 0 # time we last received a packet with messages
self.lastTransmitSeq = -1 # last sequence we sent a packet
self.state = pending # one of pending, sent, confirmed
- self.omsgq = [] # list of messages to go out
- self.imsgq = [] # list of messages coming in
self.sendSession = None # send session/observed fields until obSeq > sendSession
self.response = 0 # if we know we have a response now (like resending missed packets)
self.noisy = 0
self.outMsgNums = [0] * 256 # outgoing message numbers i = outNum % 256
self.next = 0 # next outgoing message number
self.scheduled = 0 # a sendNext is scheduled, don't schedule another
+ self.omsgq = [] # list of messages to go out
+ self.imsgq = [] # list of messages coming in
+ self.obSeq = 0 # highest sequence confirmed by remote
+ self.inSeq = 0 # last received sequence
def datagramReceived(self, datagram):
if not datagram:
if p.observed == self.sessionID:
self.observed = p.session
self.state = confirmed
+ self.response = 1
else:
- # bogus packet!
- return
+ self.observed = p.session
+ self.response = 1
elif p.session != None:
self.observed = p.session
self.response = 1
+ else:
+ self.response = 1
elif self.state == sent:
if p.observed != None and p.session != None:
if p.observed == self.sessionID:
self.observed = p.session
elif self.observed != p.session:
self.state = pending
+ self.observed = p.session
self.resetConnection()
+ self.response = 1
+ if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
+ self.protocol.resetConnection()
self.inSeq = p.seq
+ self.schedule()
+ return
+ elif p.session == None and p.observed == None:
+ self.response = 1
+ self.schedule()
+
elif self.state == confirmed:
if p.session != None or p.observed != None :
if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID):
self.inSeq = p.seq
if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
self.protocol.resetConnection()
-
+ self.schedule()
+ return
# check to make sure sequence number isn't out of order
if (p.seq - self.inSeq) % 2**16 >= 256:
return
def resetConnection(self):
AirhookConnection.resetConnection(self)
self.resetStream()
-
+
+ def loseConnection(self):
+ pass
+
def dataCameIn(self):
# put 'em together
for msg in self.imsgq:
NULL_ID = 20 * '\0'
# Kademlia "K" constant, this should be an even number
-K = 20
+K = 8
# SHA1 is 160 bits long
HASH_LENGTH = 160
from sha import sha
import whrandom
-random = open('/dev/urandom', 'r') # sucks for windoze
def intify(hstr):
"""20 bit hash, big-endian -> long python integer"""
def newID():
"""returns a new pseudorandom globally unique ID string"""
h = sha()
- h.update(random.read(20))
+ for i in range(20):
+ h.update(chr(whrandom.randint(0,255)))
return h.digest()
def newIDInRange(min, max):
from hash import newID
-def test_net(peers=24, startport=2001, dbprefix='/tmp/test'):
+def test_net(host='127.0.0.1', peers=24, startport=2001, dbprefix='/tmp/test'):
import thread
l = []
for i in xrange(peers):
- a = Khashmir('127.0.0.1', startport + i, db = dbprefix+`i`)
+ a = Khashmir(host, startport + i, db = dbprefix+`i`)
l.append(a)
thread.start_new_thread(l[0].app.run, ())
for peer in l[1:]:
self.found = 0
self.port = port
def callback(self, values):
- try:
if(len(values) == 0):
if not self.found:
- print "find %s NOT FOUND" % self.port
+ print "find %s NOT FOUND" % self.port
else:
print "find %s FOUND" % self.port
+ self.flag.set()
else:
if self.val in values:
self.found = 1
- finally:
- self.flag.set()
b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
fa.wait()
import hash
-KRPC_TIMEOUT = 30
+KRPC_TIMEOUT = 60
KRPC_ERROR = 1
KRPC_ERROR_METHOD_UNKNOWN = 2
def __init__(self):
self.tids = {}
+
+ def dataRecieved(self, data):
+ basic.NetstringReceiver(self, data)
+ if self.brokenPeer:
+ self.resetConnection()
+
def resetConnection(self):
self.brokenPeer = 0
self._readerState = basic.LENGTH
ret = apply(f, (), msg['arg'])
except Exception, e:
## send error
- str = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`})
- olen = len(str)
- self.sendString(str)
+ out = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`})
+ olen = len(out)
+ self.sendString(out)
else:
if ret:
# make response
- str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
+ out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
else:
- str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}})
+ out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}})
# send response
- olen = len(str)
- self.sendString(str)
+ olen = len(out)
+ self.sendString(out)
else:
if self.noisy:
print "don't know about method %s" % msg['req']
# unknown method
- str = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
- olen = len(str)
- self.sendString(str)
+ out = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
+ olen = len(out)
+ self.sendString(out)
if self.noisy:
print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.transport.addr, self.factory.node.port,
ilen, msg['req'], olen)
# callback
del(self.tids[msg['tid']])
df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr})
- # no tid, this transaction timed out already...
+ else:
+ print 'timeout ' + `msg['rsp']['sender']`
+ # no tid, this transaction timed out already...
elif msg['typ'] == 'err':
# if error
# lookup tid
df.errback(msg['err'])
del(self.tids[msg['tid']])
else:
+ print "unknown message type " + `msg`
# unknown message type
df = self.tids[msg['tid']]
# callback
import actions
import btemplate
import test_airhook
-
-tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'knode', 'actions', 'ktable', 'test_airhook'])
+import test_krpc
+tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'knode', 'actions', 'ktable', 'test_airhook', 'test_krpc'])
result = unittest.TextTestRunner().run(tests)
self.b = listenAirhookStream(4043, self.bf)
def testEcho(self):
- self.noisy = 1
df = self.a.connectionForAddr(('127.0.0.1', 4043)).protocol.sendRequest('echo', {'msg' : "This is a test."})
df.addCallback(self.gotMsg)
reactor.iterate()
msg = dict['rsp']
self.msg = msg
+class ManyEchoTest(TestCase):
+ def setUp(self):
+ self.noisy = 0
+ self.msg = None
+
+ self.af = Receiver()
+ self.bf = Receiver()
+ self.a = listenAirhookStream(4588, self.af)
+ self.b = listenAirhookStream(4589, self.bf)
+
+ def testManyEcho(self):
+ df = self.a.connectionForAddr(('127.0.0.1', 4589)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.msg, "This is a test.")
+ for i in xrange(100):
+ self.msg = None
+ df = self.a.connectionForAddr(('127.0.0.1', 4589)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.msg, "This is a test.")
+
+ def gotMsg(self, dict):
+ _krpc_sender = dict['_krpc_sender']
+ msg = dict['rsp']
+ self.msg = msg
+
class MultiEchoTest(TestCase):
def setUp(self):
self.noisy = 0
reactor.iterate()
self.assertEqual(self.msg, "This is another test.")
+ del(self.a.connections[('127.0.0.1', 4079)])
df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is yet another test."})
df.addCallback(self.gotMsg)
reactor.iterate()
reactor.iterate()
self.assertEqual(self.msg, "This is yet another test.")
+ def testLotsofEchoReset(self):
+ for i in range(100):
+ self.testEchoReset()
def gotMsg(self, dict):
_krpc_sender = dict['_krpc_sender']
msg = dict['rsp']