fixed connection resets in airhook, resets are propogated to the protocol via resetConnection()
class FindNode(ActionBase):
""" find node action merits it's own class as it is a long running stateful process """
def handleGotNodes(self, dict):
+ _krpc_sender = dict['_krpc_sender']
+ dict = dict['rsp']
l = dict["nodes"]
sender = dict["sender"]
+ sender['port'] = _krpc_sender[1]
sender = Node().initWithDict(sender)
sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
self.table.table.insertNode(sender)
class GetValue(FindNode):
""" get value task """
def handleGotNodes(self, dict):
+ _krpc_sender = dict['_krpc_sender']
+ dict = dict['rsp']
sender = dict["sender"]
+ sender['port'] = _krpc_sender[1]
sender = Node().initWithDict(sender)
sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
self.table.table.insertNode(sender)
return y
else:
return None
+ z = len(dict['values'])
v = filter(None, map(x, dict['values']))
if(len(v)):
reactor.callFromThread(self.callback, v)
s = "delete from kv where time < '%s';" % self.cut
c.execute(s)
reactor.callLater(const.KE_DELAY, self.doExpire)
-
\ No newline at end of file
self.lastTransmitSeq = -1 # last sequence we sent a packet
self.state = pending # one of pending, sent, confirmed
- self.outMsgs = [None] * 256 # outgoing messages (seq sent, message), index = message number
self.omsgq = [] # list of messages to go out
self.imsgq = [] # list of messages coming in
self.sendSession = None # send session/observed fields until obSeq > sendSession
self.response = 0 # if we know we have a response now (like resending missed packets)
self.noisy = 0
- self.scheduled = 0 # a sendNext is scheduled, don't schedule another
- self.resetMessages()
+ self.resetConnection()
- def resetMessages(self):
+ def resetConnection(self):
self.weMissed = []
+ self.outMsgs = [None] * 256 # outgoing messages (seq sent, message), index = message number
self.inMsg = 0 # next incoming message number
self.outMsgNums = [0] * 256 # outgoing message numbers i = outNum % 256
self.next = 0 # next outgoing message number
+ self.scheduled = 0 # a sendNext is scheduled, don't schedule another
def datagramReceived(self, datagram):
if not datagram:
self.observed = p.session
elif self.observed != p.session:
self.state = pending
- self.resetMessages()
+ self.resetConnection()
self.inSeq = p.seq
elif self.state == confirmed:
if p.session != None or p.observed != None :
if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID):
self.state = pending
self.observed = p.session
- self.resetMessages()
+ self.resetConnection()
self.inSeq = p.seq
+ if hasattr(self.protocol, "resetConnection") and callable(self.protocol.resetConnection):
+ self.protocol.resetConnection()
# check to make sure sequence number isn't out of order
if (p.seq - self.inSeq) % 2**16 >= 256:
"""
def __init__(self):
AirhookConnection.__init__(self)
+ self.resetStream()
+
+ def resetStream(self):
self.oseq = 0
self.iseq = 0
self.q = []
+ def resetConnection(self):
+ AirhookConnection.resetConnection(self)
+ self.resetStream()
+
def dataCameIn(self):
# put 'em together
for msg in self.imsgq:
from twisted.internet import main
main.installReactor(reactor)
+
try:
import twisted.names.client
reactor.installResolver(twisted.names.client.theResolver)
except IOError:
print "no resolv.conf!"
-
+
# magic id to use before we know a peer's id
NULL_ID = 20 * '\0'
class KNode(Node):
def checkSender(self, dict):
try:
- senderid = dict['sender']['id']
+ senderid = dict['rsp']['sender']['id']
except KeyError:
+ print ">>>> No peer id in response"
raise Exception, "No peer id in response."
else:
if self.id != NULL_ID and senderid != self.id:
+ print "Got response from different node than expected."
raise Exception, "Got response from different node than expected."
return dict
def __init__(self):
self.tids = {}
+ def resetConnection(self):
+ self.brokenPeer = 0
+ self._readerState = basic.LENGTH
+ self._readerLength = 0
+
def stringReceived(self, str):
# bdecode
try:
msg = bdecode(str)
except Exception, e:
- print "response decode error: " + `e`
+ if self.naisy:
+ print "response decode error: " + `e`
self.d.errback()
else:
# look at msg type
# make response
str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
else:
- str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : []})
+ str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}})
# send response
olen = len(str)
self.sendString(str)
else:
+ if self.noisy:
+ print "don't know about method %s" % msg['req']
# unknown method
str = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
olen = len(str)
if self.tids.has_key(msg['tid']):
df = self.tids[msg['tid']]
# callback
- df.callback(msg['rsp'])
del(self.tids[msg['tid']])
- # no tid, perhaps this transaction timed out already...
+ df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr})
+ # no tid, this transaction timed out already...
elif msg['typ'] == 'err':
# if error
# lookup tid
# send it
msg = {'tid' : hash.newID(), 'typ' : 'req', 'req' : method, 'arg' : args}
str = bencode(msg)
- self.sendString(str)
d = Deferred()
self.tids[msg['tid']] = d
-
def timeOut(tids = self.tids, id = msg['tid']):
if tids.has_key(id):
df = tids[id]
del(tids[id])
+ print ">>>>>> KRPC_ERROR_TIMEOUT"
df.errback(KRPC_ERROR_TIMEOUT)
reactor.callLater(KRPC_TIMEOUT, timeOut)
+ self.sendString(str)
return d
-
\ No newline at end of file
+
self.assertEqual(a.outMsgNums[(a.outSeq-1) % 256], 254)
def testConnectionReset(self):
+ self.testTwoWayBlast()
+ self.b.protocol.q = []
a = self.a
b = self.b
msg = swap(a, noisy=self.noisy)
self.assertEqual(len(b.protocol.q), 2)
self.assertEqual(b.protocol.q[1], "TESTING2")
+ def testRecipientReset(self):
+ self.testTwoWayBlast()
+ self.b.protocol.q = []
+ self.noisy = 1
+ a = self.a
+ b = self.b
+ msg = swap(a, noisy=self.noisy)
+ b.datagramReceived(msg)
+
+ msg = swap(b, noisy=self.noisy)
+ a.datagramReceived(msg)
+
+ a.omsgq.append("TESTING")
+ msg = swap(a, noisy=self.noisy)
+ b.datagramReceived(msg)
+
+ msg = swap(b, noisy=self.noisy)
+ a.datagramReceived(msg)
+
+ self.assertEqual(b.protocol.q[0], "TESTING")
+ self.assertEqual(b.state, confirmed)
+
+ self.b = AirhookConnection()
+ self.b.makeConnection(DummyTransport())
+ self.b.protocol = Receiver()
+ self.b.addr = ('127.0.0.1', 4444)
+ b = self.b
+
+ msg = swap(a, noisy=self.noisy)
+ b.datagramReceived(msg)
+
+ msg = swap(b, noisy=self.noisy)
+ a.datagramReceived(msg)
+
+ a.omsgq.append("TESTING2")
+ self.assertEqual(len(b.protocol.q), 0)
+ msg = swap(a, noisy=self.noisy)
+ b.datagramReceived(msg)
+
+ msg = swap(b, noisy=self.noisy)
+ a.datagramReceived(msg)
+
+ msg = swap(a, noisy=self.noisy)
+ b.datagramReceived(msg)
+
+ msg = swap(b, noisy=self.noisy)
+ a.datagramReceived(msg)
+
+ self.assertEqual(len(b.protocol.q), 1)
+ self.assertEqual(b.protocol.q[0], "TESTING2")
+
class StreamTests(unittest.TestCase):
def setUp(self):
reactor.iterate()
self.assertEqual(self.ac.protocol.buf, msg)
-
\ No newline at end of file
+
self.b = listenAirhookStream(4051, self.bf)
def testSimpleMessage(self):
- self.noisy = 1
+ self.noisy = 0
self.a.connectionForAddr(('127.0.0.1', 4051)).protocol.sendRequest('store', {'msg' : "This is a test."})
reactor.iterate()
reactor.iterate()
reactor.iterate()
self.assertEqual(self.bf.buf, ["This is a test."])
+class BlastTest(TestCase):
+ def setUp(self):
+ self.noisy = 0
+
+ self.af = Receiver()
+ self.bf = Receiver()
+ self.a = listenAirhookStream(4060, self.af)
+ self.b = listenAirhookStream(4061, self.bf)
+
+ def testMessageBlast(self):
+ self.a.connectionForAddr(('127.0.0.1', 4061)).protocol.sendRequest('store', {'msg' : "This is a test."})
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.bf.buf, ["This is a test."])
+ self.bf.buf = []
+
+ for i in range(100):
+ self.a.connectionForAddr(('127.0.0.1', 4061)).protocol.sendRequest('store', {'msg' : "This is a test."})
+ reactor.iterate()
+ #self.bf.buf = []
+ self.assertEqual(self.bf.buf, ["This is a test."] * 100)
+
class EchoTest(TestCase):
def setUp(self):
self.noisy = 0
reactor.iterate()
self.assertEqual(self.msg, "This is a test.")
- def gotMsg(self, msg):
+ def gotMsg(self, dict):
+ _krpc_sender = dict['_krpc_sender']
+ msg = dict['rsp']
self.msg = msg
class MultiEchoTest(TestCase):
reactor.iterate()
self.assertEqual(self.msg, "This is yet another test.")
- def gotMsg(self, msg):
+ def gotMsg(self, dict):
+ _krpc_sender = dict['_krpc_sender']
+ msg = dict['rsp']
+ self.msg = msg
+
+class EchoResetTest(TestCase):
+ def setUp(self):
+ self.noisy = 0
+ self.msg = None
+
+ self.af = Receiver()
+ self.bf = Receiver()
+ self.a = listenAirhookStream(4078, self.af)
+ self.b = listenAirhookStream(4079, self.bf)
+
+ def testEchoReset(self):
+ self.noisy = 1
+ df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.msg, "This is a test.")
+
+ df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is another test."})
+ df.addCallback(self.gotMsg)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.msg, "This is another test.")
+
+ df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is yet another test."})
+ df.addCallback(self.gotMsg)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.msg, "This is yet another test.")
+
+ def gotMsg(self, dict):
+ _krpc_sender = dict['_krpc_sender']
+ msg = dict['rsp']
self.msg = msg
class UnknownMethErrTest(TestCase):
def gotErr(self, err):
self.err = err.value
+