]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/krpc.py
Updated and added a lot of unittests.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
index a4fbacc0a1fb1a6ed3b1744d8900c32ee36594ef..47e7452f907e42baa755226e0b94db0abf7bd274 100644 (file)
@@ -127,6 +127,8 @@ class hostbroker(protocol.DatagramProtocol):
     
     @type server: L{khashmir.Khashmir}
     @ivar server: the main Khashmir program
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics logger to save transport info
     @type config: C{dictionary}
     @ivar config: the configuration parameters for the DHT
     @type connections: C{dictionary}
@@ -139,15 +141,18 @@ class hostbroker(protocol.DatagramProtocol):
     @ivar addr: the IP address and port of this node
     """
     
-    def __init__(self, server, config):
+    def __init__(self, server, stats, config):
         """Initialize the factory.
         
         @type server: L{khashmir.Khashmir}
         @param server: the main DHT program
+        @type stats: L{stats.StatsLogger}
+        @param stats: the statistics logger to save transport info
         @type config: C{dictionary}
         @param config: the configuration parameters for the DHT
         """
         self.server = server
+        self.stats = stats
         self.config = config
         # this should be changed to storage that drops old entries
         self.connections = {}
@@ -177,7 +182,7 @@ class hostbroker(protocol.DatagramProtocol):
         
         # Create a new protocol object if necessary
         if not self.connections.has_key(addr):
-            conn = self.protocol(addr, self.server, self.transport, self.config['SPEW'])
+            conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW'])
             self.connections[addr] = conn
         else:
             conn = self.connections[addr]
@@ -201,6 +206,8 @@ class KRPC:
     @ivar transport: the transport to use for the protocol
     @type factory: L{khashmir.Khashmir}
     @ivar factory: the main Khashmir program
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics logger to save transport info
     @type addr: (C{string}, C{int})
     @ivar addr: the IP address and port of the source node
     @type noisy: C{boolean}
@@ -213,13 +220,15 @@ class KRPC:
     @ivar stopped: whether the protocol has been stopped
     """
     
-    def __init__(self, addr, server, transport, spew = False):
+    def __init__(self, addr, server, stats, transport, spew = False):
         """Initialize the protocol.
         
         @type addr: (C{string}, C{int})
         @param addr: the IP address and port of the source node
         @type server: L{khashmir.Khashmir}
         @param server: the main Khashmir program
+        @type stats: L{stats.StatsLogger}
+        @param stats: the statistics logger to save transport info
         @param transport: the transport to use for the protocol
         @type spew: C{boolean}
         @param spew: whether to log additional details of the protocol
@@ -227,6 +236,7 @@ class KRPC:
         """
         self.transport = transport
         self.factory = server
+        self.stats = stats
         self.addr = addr
         self.noisy = spew
         self.tids = {}
@@ -240,6 +250,7 @@ class KRPC:
         @type addr: (C{string}, C{int})
         @param addr: source IP address and port of datagram.
         """
+        self.stats.receivedBytes(len(data))
         if self.stopped:
             if self.noisy:
                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
@@ -272,28 +283,34 @@ class KRPC:
             f = getattr(self.factory ,"krpc_" + msg[REQ], None)
             msg[ARG]['_krpc_sender'] =  self.addr
             if f and callable(f):
+                self.stats.receivedAction(msg[REQ])
                 try:
                     ret = f(*(), **msg[ARG])
                 except KrpcError, e:
                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
                     log.err(e)
-                    olen = self._sendResponse(addr, msg[TID], ERR, [e[0], e[1]])
+                    self.stats.errorAction(msg[REQ])
+                    olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
+                                              [e[0], e[1]])
                 except TypeError, e:
                     log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
                     log.err(e)
-                    olen = self._sendResponse(addr, msg[TID], ERR,
+                    self.stats.errorAction(msg[REQ])
+                    olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                               [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
                 except Exception, e:
                     log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
                     log.err(e)
-                    olen = self._sendResponse(addr, msg[TID], ERR,
+                    self.stats.errorAction(msg[REQ])
+                    olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                               [KRPC_ERROR_SERVER_ERROR, str(e)])
                 else:
-                    olen = self._sendResponse(addr, msg[TID], RSP, ret)
+                    olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
             else:
                 # Request for unknown method
                 log.msg("ERROR: don't know about method %s" % msg[REQ])
-                olen = self._sendResponse(addr, msg[TID], ERR,
+                self.stats.receivedAction('unknown')
+                olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
             if self.noisy:
                 log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
@@ -304,7 +321,8 @@ class KRPC:
                 df = self.tids[msg[TID]]
                 #      callback
                 del(self.tids[msg[TID]])
-                df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
+                msg[RSP]['_krpc_sender'] = addr
+                df.callback(msg[RSP])
             else:
                 # no tid, this transaction timed out already...
                 if self.noisy:
@@ -331,9 +349,10 @@ class KRPC:
                 df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
                                      "Received an unknown message type: %r" % msg[TYP]))
                 
-    def _sendResponse(self, addr, tid, msgType, response):
+    def _sendResponse(self, request, addr, tid, msgType, response):
         """Helper function for sending responses to nodes.
-        
+
+        @param request: the name of the requested method
         @type addr: (C{string}, C{int})
         @param addr: source IP address and port of datagram.
         @param tid: the transaction ID of the request
@@ -388,14 +407,17 @@ class KRPC:
                 else:
                     # Too long a response, send an error
                     log.msg('Could not send response, too long: %d bytes' % len(out))
+                    self.stats.errorAction(request)
                     msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
                     out = bencode(msg)
 
         except Exception, e:
             # Unknown error, send an error message
+            self.stats.errorAction(request)
             msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
             out = bencode(msg)
                     
+        self.stats.sentBytes(len(out))
         self.transport.write(out, addr)
         return len(out)
     
@@ -418,6 +440,10 @@ class KRPC:
         # Create the deferred and save it with the TID
         d = Deferred()
         self.tids[msg[TID]] = d
+        
+        # Save the conclusion of the action
+        d.addCallbacks(self.stats.responseAction, self.stats.failedAction,
+                       callbackArgs = (method, ), errbackArgs = (method, ))
 
         # Schedule a later timeout call
         def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr):
@@ -435,6 +461,10 @@ class KRPC:
                 later_call.cancel()
             return dict
         d.addBoth(dropTimeOut)
+
+        # Save some stats
+        self.stats.sentAction(method)
+        self.stats.sentBytes(len(data))
         
         self.transport.write(data, self.addr)
         return d
@@ -442,7 +472,8 @@ class KRPC:
     def stop(self):
         """Timeout all pending requests."""
         for df in self.tids.values():
-            df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, 'connection has been stopped while waiting for response'))
+            df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+                                 'connection has been stopped while waiting for response'))
         self.tids = {}
         self.stopped = True
 
@@ -463,8 +494,9 @@ class Receiver(protocol.Factory):
         return {'values': ['1'*length]*num}
 
 def make(port):
+    from stats import StatsLogger
     af = Receiver()
-    a = hostbroker(af, {'SPEW': False})
+    a = hostbroker(af, StatsLogger(None, None, {}), {'SPEW': False})
     a.protocol = KRPC
     p = reactor.listenUDP(port, a)
     return af, a, p
@@ -501,8 +533,7 @@ class KRPCTests(unittest.TestCase):
 
     def gotMsg(self, dict, should_be):
         _krpc_sender = dict['_krpc_sender']
-        msg = dict['rsp']
-        self.failUnlessEqual(msg['msg'], should_be)
+        self.failUnlessEqual(dict['msg'], should_be)
 
     def testManyEcho(self):
         for i in xrange(100):
@@ -539,16 +570,20 @@ class KRPCTests(unittest.TestCase):
 
     def testUnknownMeth(self):
         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
+        df = self.failUnlessFailure(df, KrpcError)
         df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
         return df
 
     def testMalformedRequest(self):
         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
-        df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST)
+        df = self.failUnlessFailure(df, KrpcError)
+        df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
         return df
 
-    def gotErr(self, err, should_be):
-        self.failUnlessEqual(err.value[0], should_be)
+    def gotErr(self, value, should_be, *errorTypes):
+        self.failUnlessEqual(value[0], should_be)
+        if errorTypes:
+            self.flushLoggedErrors(*errorTypes)
         
     def testLongPackets(self):
         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})