]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/krpc.py
Improve the creation of nodes and move all to the main khashmir class.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / krpc.py
index 8a6009250e619f88606629d21c1d142ce61bad31..2c1f7e83b972c1a4e8fb3cdfcacb64de4b87f2fe 100644 (file)
@@ -7,8 +7,8 @@ import sys
 from traceback import format_exception
 
 from twisted.internet.defer import Deferred
-from twisted.internet import protocol
-from twisted.internet import reactor
+from twisted.internet import protocol, reactor
+from twisted.trial import unittest
 
 KRPC_TIMEOUT = 20
 
@@ -25,6 +25,9 @@ TYP = 'y'
 ARG = 'a'
 ERR = 'e'
 
+class ProtocolError(Exception):
+    pass
+
 class hostbroker(protocol.DatagramProtocol):       
     def __init__(self, server):
         self.server = server
@@ -53,6 +56,11 @@ class hostbroker(protocol.DatagramProtocol):
         protocol.DatagramProtocol.makeConnection(self, transport)
         tup = transport.getHost()
         self.addr = (tup.host, tup.port)
+        
+    def stopProtocol(self):
+        for conn in self.connections.values():
+            conn.stop()
+        protocol.DatagramProtocol.stopProtocol(self)
 
 ## connection
 class KRPC:
@@ -63,8 +71,12 @@ class KRPC:
         self.addr = addr
         self.tids = {}
         self.mtid = 0
+        self.stopped = False
 
     def datagramReceived(self, str, addr):
+        if self.stopped:
+            if self.noisy:
+                print "stopped, dropping message from", addr, str
         # bdecode
         try:
             msg = bdecode(str)
@@ -72,8 +84,8 @@ class KRPC:
             if self.noisy:
                 print "response decode error: " + `e`
         else:
-            #if self.noisy:
-            #    print msg
+            if self.noisy:
+                print msg
             # look at msg type
             if msg[TYP]  == REQ:
                 ilen = len(str)
@@ -140,6 +152,8 @@ class KRPC:
                 del(self.tids[msg[TID]])
                 
     def sendRequest(self, method, args):
+        if self.stopped:
+            raise ProtocolError, "connection has been stopped"
         # make message
         # send it
         msg = {TID : chr(self.mtid), TYP : REQ,  REQ : method, ARG : args}
@@ -147,13 +161,118 @@ class KRPC:
         str = bencode(msg)
         d = Deferred()
         self.tids[msg[TID]] = d
-        def timeOut(tids = self.tids, id = msg[TID]):
+        def timeOut(tids = self.tids, id = msg[TID], msg = msg):
             if tids.has_key(id):
                 df = tids[id]
                 del(tids[id])
                 print ">>>>>> KRPC_ERROR_TIMEOUT"
-                df.errback(KRPC_ERROR_TIMEOUT)
-        reactor.callLater(KRPC_TIMEOUT, timeOut)
+                df.errback(ProtocolError('timeout waiting for %r' % msg))
+        later = reactor.callLater(KRPC_TIMEOUT, timeOut)
+        def dropTimeOut(dict, later_call = later):
+            if later_call.active():
+                later_call.cancel()
+            return dict
+        d.addBoth(dropTimeOut)
         self.transport.write(str, self.addr)
         return d
+    
+    def stop(self):
+        """Timeout all pending requests."""
+        for df in self.tids.values():
+            df.errback(ProtocolError('connection has been closed'))
+        self.tids = {}
+        self.stopped = True
  
+def connectionForAddr(host, port):
+    return host
+    
+class Receiver(protocol.Factory):
+    protocol = KRPC
+    def __init__(self):
+        self.buf = []
+    def krpc_store(self, msg, _krpc_sender):
+        self.buf += [msg]
+    def krpc_echo(self, msg, _krpc_sender):
+        return msg
+
+def make(port):
+    af = Receiver()
+    a = hostbroker(af)
+    a.protocol = KRPC
+    p = reactor.listenUDP(port, a)
+    return af, a, p
+    
+class KRPCTests(unittest.TestCase):
+    def setUp(self):
+        KRPC.noisy = 0
+        self.af, self.a, self.ap = make(1180)
+        self.bf, self.b, self.bp = make(1181)
+
+    def tearDown(self):
+        self.ap.stopListening()
+        self.bp.stopListening()
+
+    def bufEquals(self, result, value):
+        self.assertEqual(self.bf.buf, value)
+
+    def testSimpleMessage(self):
+        d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
+        d.addCallback(self.bufEquals, ["This is a test."])
+        return d
+
+    def testMessageBlast(self):
+        for i in range(100):
+            d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
+        d.addCallback(self.bufEquals, ["This is a test."] * 100)
+        return d
+
+    def testEcho(self):
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+        df.addCallback(self.gotMsg, "This is a test.")
+        return df
+
+    def gotMsg(self, dict, should_be):
+        _krpc_sender = dict['_krpc_sender']
+        msg = dict['rsp']
+        self.assertEqual(msg, should_be)
+
+    def testManyEcho(self):
+        for i in xrange(100):
+            df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+            df.addCallback(self.gotMsg, "This is a test.")
+        return df
+
+    def testMultiEcho(self):
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+        df.addCallback(self.gotMsg, "This is a test.")
+
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
+        df.addCallback(self.gotMsg, "This is another test.")
+
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
+        df.addCallback(self.gotMsg, "This is yet another test.")
+        
+        return df
+
+    def testEchoReset(self):
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
+        df.addCallback(self.gotMsg, "This is a test.")
+
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
+        df.addCallback(self.gotMsg, "This is another test.")
+        df.addCallback(self.echoReset)
+        return df
+    
+    def echoReset(self, dict):
+        del(self.a.connections[('127.0.0.1', 1181)])
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
+        df.addCallback(self.gotMsg, "This is yet another test.")
+        return df
+
+    def testUnknownMeth(self):
+        df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
+        df.addErrback(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
+        return df
+
+    def gotErr(self, err, should_be):
+        self.assertEqual(err.value, should_be)