]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/krpc.py
Make sure full timeouts occur even if a resend timeout has not.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
index 97b8908c2677bdad85653d3f42892427b3e0bcc5..b151a5527f15b6f210f14e83b565a790cfe750b7 100644 (file)
@@ -1,9 +1,6 @@
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
 
 """The KRPC communication protocol implementation.
 
 
 """The KRPC communication protocol implementation.
 
-@var KRPC_TIMEOUT: the number of seconds after which requests timeout
 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
     UDP packet without fragmentation
 
 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
     UDP packet without fragmentation
 
 """
 
 from bencode import bencode, bdecode
 """
 
 from bencode import bencode, bdecode
-from time import asctime
+from datetime import datetime, timedelta
 from math import ceil
 
 from math import ceil
 
-from twisted.internet.defer import Deferred
+from twisted.internet import defer
 from twisted.internet import protocol, reactor
 from twisted.python import log
 from twisted.trial import unittest
 
 from khash import newID
 
 from twisted.internet import protocol, reactor
 from twisted.python import log
 from twisted.trial import unittest
 
 from khash import newID
 
-KRPC_TIMEOUT = 20
 UDP_PACKET_LIMIT = 1472
 
 # Remote node errors
 UDP_PACKET_LIMIT = 1472
 
 # Remote node errors
@@ -182,7 +178,7 @@ class hostbroker(protocol.DatagramProtocol):
         
         # Create a new protocol object if necessary
         if not self.connections.has_key(addr):
         
         # Create a new protocol object if necessary
         if not self.connections.has_key(addr):
-            conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW'])
+            conn = self.protocol(addr, self.server, self.stats, self.transport, self.config)
             self.connections[addr] = conn
         else:
             conn = self.connections[addr]
             self.connections[addr] = conn
         else:
             conn = self.connections[addr]
@@ -200,45 +196,98 @@ class hostbroker(protocol.DatagramProtocol):
             conn.stop()
         protocol.DatagramProtocol.stopProtocol(self)
 
             conn.stop()
         protocol.DatagramProtocol.stopProtocol(self)
 
-class KrpcRequest(Deferred):
+class KrpcRequest(defer.Deferred):
     """An outstanding request to a remote node.
     
     """An outstanding request to a remote node.
     
-    
+    @type protocol: L{KRPC}
+    @ivar protocol: the protocol to send data with
+    @ivar tid: the transaction ID of the request
+    @type method: C{string}
+    @ivar method: the name of the method to call on the remote node
+    @type data: C{string}
+    @ivar data: the message to send to the remote node
+    @type config: C{dictionary}
+    @ivar config: the configuration parameters for the DHT
+    @type delay: C{int}
+    @ivar delay: the last timeout delay sent
+    @type start: C{datetime}
+    @ivar start: the time to request was started at
+    @type laterNextTimeout: L{twisted.internet.interfaces.IDelayedCall}
+    @ivar laterNextTimeout: the pending call to timeout the last sent request
+    @type laterFinalTimeout: L{twisted.internet.interfaces.IDelayedCall}
+    @ivar laterFinalTimeout: the pending call to timeout the entire request
     """
     
     """
     
-    def __init__(self, protocol, newTID, method, data, initDelay):
-        Deferred.__init__(self)
+    def __init__(self, protocol, newTID, method, data, config):
+        """Initialize the request, and send it out.
+        
+        @type protocol: L{KRPC}
+        @param protocol: the protocol to send data with
+        @param newTID: the transaction ID of the request
+        @type method: C{string}
+        @param method: the name of the method to call on the remote node
+        @type data: C{string}
+        @param data: the message to send to the remote node
+        @type config: C{dictionary}
+        @param config: the configuration parameters for the DHT
+        """
+        defer.Deferred.__init__(self)
         self.protocol = protocol
         self.tid = newTID
         self.method = method
         self.data = data
         self.protocol = protocol
         self.tid = newTID
         self.method = method
         self.data = data
-        self.delay = initDelay
-        self.later = None
+        self.config = config
+        self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
+        self.start = datetime.now()
+        self.laterNextTimeout = None
+        self.laterFinalTimeout = reactor.callLater(self.config.get('KRPC_TIMEOUT', 9), self.finalTimeout)
+        reactor.callLater(0, self.send)
         
     def send(self):
         
     def send(self):
-        assert not self.later, 'There is already a pending request'
-        self.later = reactor.callLater(self.delay, self.timeOut)
-        self.delay *= 2
-        self.protocol.sendData(self.method, self.data)
-
-    def timeOut(self):
-        """Call the deferred's errback if a timeout occurs."""
-        self.later = None
+        """Send the request to the remote node."""
+        assert not self.laterNextTimeout, 'There is already a pending request'
+        self.laterNextTimeout = reactor.callLater(self.delay, self.nextTimeout)
+        try:
+            self.protocol.sendData(self.method, self.data)
+        except:
+            log.err()
+
+    def nextTimeout(self):
+        """Check for a unrecoverable timeout, otherwise resend."""
+        self.laterNextTimeout = None
+        if datetime.now() - self.start > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 9)):
+            self.finalTimeout()
+        elif self.protocol.stopped:
+            log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
+        else:
+            self.delay *= 2
+            log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
+            reactor.callLater(0, self.send)
+        
+    def finalTimeout(self):
+        """Timeout the request after an unrecoverable timeout."""
+        self.dropTimeOut()
+        delay = datetime.now() - self.start
+        log.msg('%r timed out after %0.2f sec' %
+                (self.tid, delay.seconds + delay.microseconds/1000000.0))
         self.protocol.timeOut(self.tid, self.method)
         
     def callback(self, resp):
         self.dropTimeOut()
         self.protocol.timeOut(self.tid, self.method)
         
     def callback(self, resp):
         self.dropTimeOut()
-        Deferred.callback(self, resp)
+        defer.Deferred.callback(self, resp)
         
     def errback(self, resp):
         self.dropTimeOut()
         
     def errback(self, resp):
         self.dropTimeOut()
-        Deferred.errback(self, resp)
+        defer.Deferred.errback(self, resp)
         
     def dropTimeOut(self):
         """Cancel the timeout call when a response is received."""
         
     def dropTimeOut(self):
         """Cancel the timeout call when a response is received."""
-        if self.later and self.later.active():
-            self.later.cancel()
-            self.later = None
+        if self.laterFinalTimeout and self.laterFinalTimeout.active():
+            self.laterFinalTimeout.cancel()
+        self.laterFinalTimeout = None
+        if self.laterNextTimeout and self.laterNextTimeout.active():
+            self.laterNextTimeout.cancel()
+        self.laterNextTimeout = None
 
 class KRPC:
     """The KRPC protocol implementation.
 
 class KRPC:
     """The KRPC protocol implementation.
@@ -250,8 +299,8 @@ class KRPC:
     @ivar stats: the statistics logger to save transport info
     @type addr: (C{string}, C{int})
     @ivar addr: the IP address and port of the source node
     @ivar stats: the statistics logger to save transport info
     @type addr: (C{string}, C{int})
     @ivar addr: the IP address and port of the source node
-    @type noisy: C{boolean}
-    @ivar noisy: whether to log additional details of the protocol
+    @type config: C{dictionary}
+    @ivar config: the configuration parameters for the DHT
     @type tids: C{dictionary}
     @ivar tids: the transaction IDs outstanding for requests, keys are the
         transaction ID of the request, values are the deferreds to call with
     @type tids: C{dictionary}
     @ivar tids: the transaction IDs outstanding for requests, keys are the
         transaction ID of the request, values are the deferreds to call with
@@ -260,7 +309,7 @@ class KRPC:
     @ivar stopped: whether the protocol has been stopped
     """
     
     @ivar stopped: whether the protocol has been stopped
     """
     
-    def __init__(self, addr, server, stats, transport, spew = False):
+    def __init__(self, addr, server, stats, transport, config = {}):
         """Initialize the protocol.
         
         @type addr: (C{string}, C{int})
         """Initialize the protocol.
         
         @type addr: (C{string}, C{int})
@@ -270,15 +319,15 @@ class KRPC:
         @type stats: L{stats.StatsLogger}
         @param stats: the statistics logger to save transport info
         @param transport: the transport to use for the protocol
         @type stats: L{stats.StatsLogger}
         @param stats: the statistics logger to save transport info
         @param transport: the transport to use for the protocol
-        @type spew: C{boolean}
-        @param spew: whether to log additional details of the protocol
-            (optional, defaults to False)
+        @type config: C{dictionary}
+        @param config: the configuration parameters for the DHT
+            (optional, defaults to using defaults)
         """
         self.transport = transport
         self.factory = server
         self.stats = stats
         self.addr = addr
         """
         self.transport = transport
         self.factory = server
         self.stats = stats
         self.addr = addr
-        self.noisy = spew
+        self.config = config
         self.tids = {}
         self.stopped = False
 
         self.tids = {}
         self.stopped = False
 
@@ -292,14 +341,14 @@ class KRPC:
         """
         self.stats.receivedBytes(len(data))
         if self.stopped:
         """
         self.stats.receivedBytes(len(data))
         if self.stopped:
-            if self.noisy:
+            if self.config.get('SPEW', False):
                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
 
         # Bdecode the message
         try:
             msg = bdecode(data)
         except Exception, e:
                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
 
         # Bdecode the message
         try:
             msg = bdecode(data)
         except Exception, e:
-            if self.noisy:
+            if self.config.get('SPEW', False):
                 log.msg("krpc bdecode error: ")
                 log.err(e)
             return
                 log.msg("krpc bdecode error: ")
                 log.err(e)
             return
@@ -312,7 +361,7 @@ class KRPC:
             log.err(e)
             return
 
             log.err(e)
             return
 
-        if self.noisy:
+        if self.config.get('SPEW', False):
             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
 
         # Process it based on its type
             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
 
         # Process it based on its type
@@ -328,7 +377,8 @@ class KRPC:
                     ret = f(*(), **msg[ARG])
                 except KrpcError, e:
                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
                     ret = f(*(), **msg[ARG])
                 except KrpcError, e:
                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
-                    log.err(e)
+                    if e[0] != KRPC_ERROR_INVALID_TOKEN:
+                        log.err(e)
                     self.stats.errorAction(msg[REQ])
                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                               [e[0], e[1]])
                     self.stats.errorAction(msg[REQ])
                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                               [e[0], e[1]])
@@ -352,9 +402,8 @@ class KRPC:
                 self.stats.receivedAction('unknown')
                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
                 self.stats.receivedAction('unknown')
                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
-            if self.noisy:
-                log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
-                                                  ilen, msg[REQ], olen))
+
+            log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen))
         elif msg[TYP] == RSP:
             # Responses get processed by their TID's deferred
             if self.tids.has_key(msg[TID]):
         elif msg[TYP] == RSP:
             # Responses get processed by their TID's deferred
             if self.tids.has_key(msg[TID]):
@@ -364,9 +413,10 @@ class KRPC:
                 msg[RSP]['_krpc_sender'] = addr
                 req.callback(msg[RSP])
             else:
                 msg[RSP]['_krpc_sender'] = addr
                 req.callback(msg[RSP])
             else:
-                # no tid, this transaction timed out already...
-                if self.noisy:
-                    log.msg('timeout: %r' % msg[RSP]['id'])
+                # no tid, this transaction was finished already...
+                if self.config.get('SPEW', False):
+                    log.msg('received response from %r for completed request: %r' %
+                            (msg[RSP]['id'], msg[TID]))
         elif msg[TYP] == ERR:
             # Errors get processed by their TID's deferred's errback
             if self.tids.has_key(msg[TID]):
         elif msg[TYP] == ERR:
             # Errors get processed by their TID's deferred's errback
             if self.tids.has_key(msg[TID]):
@@ -375,12 +425,12 @@ class KRPC:
                 # callback
                 req.errback(KrpcError(*msg[ERR]))
             else:
                 # callback
                 req.errback(KrpcError(*msg[ERR]))
             else:
-                # day late and dollar short, just log it
-                log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
-                pass
+                # no tid, this transaction was finished already...
+                log.msg('received an error %r from %r for completed request: %r' %
+                        (msg[ERR], msg[RSP]['id'], msg[TID]))
         else:
             # Received an unknown message type
         else:
             # Received an unknown message type
-            if self.noisy:
+            if self.config.get('SPEW', False):
                 log.msg("unknown message type: %r" % msg)
             if msg[TID] in self.tids:
                 req = self.tids[msg[TID]]
                 log.msg("unknown message type: %r" % msg)
             if msg[TID] in self.tids:
                 req = self.tids[msg[TID]]
@@ -406,7 +456,7 @@ class KRPC:
             # Create the response message
             msg = {TID : tid, TYP : msgType, msgType : response}
     
             # Create the response message
             msg = {TID : tid, TYP : msgType, msgType : response}
     
-            if self.noisy:
+            if self.config.get('SPEW', False):
                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
     
             out = bencode(msg)
                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
     
             out = bencode(msg)
@@ -469,35 +519,46 @@ class KRPC:
         @param args: the arguments to send to the remote node's method
         """
         if self.stopped:
         @param args: the arguments to send to the remote node's method
         """
         if self.stopped:
-            raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
+            return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+                                        "cannot send, connection has been stopped"))
 
         # Create the request message
         newTID = newID()
         msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
 
         # Create the request message
         newTID = newID()
         msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
-        if self.noisy:
+        if self.config.get('SPEW', False):
             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
         data = bencode(msg)
         
             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
         data = bencode(msg)
         
-        # Create the deferred and save it with the TID
-        req = KrpcRequest(self, newTID, method, data, 10)
+        # Create the request object and save it with the TID
+        req = KrpcRequest(self, newTID, method, data, self.config)
         self.tids[newTID] = req
         
         # Save the conclusion of the action
         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
         self.tids[newTID] = req
         
         # Save the conclusion of the action
         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
-                         callbackArgs = (method, ), errbackArgs = (method, ))
+                         callbackArgs = (method, datetime.now()),
+                         errbackArgs = (method, datetime.now()))
 
 
-        req.send()
         return req
     
     def sendData(self, method, data):
         return req
     
     def sendData(self, method, data):
-        # Save some stats
-        self.stats.sentAction(method)
-        self.stats.sentBytes(len(data))
+        """Write a request to the transport and save the stats.
         
         
+        @type method: C{string}
+        @param method: the name of the method to call on the remote node
+        @type data: C{string}
+        @param data: the message to send to the remote node
+        """
         self.transport.write(data, self.addr)
         self.transport.write(data, self.addr)
+        self.stats.sentAction(method)
+        self.stats.sentBytes(len(data))
         
     def timeOut(self, badTID, method):
         
     def timeOut(self, badTID, method):
-        """Call the deferred's errback if a timeout occurs."""
+        """Call the deferred's errback if a timeout occurs.
+        
+        @param badTID: the transaction ID of the request
+        @type method: C{string}
+        @param method: the name of the method that timed out on the remote node
+        """
         if badTID in self.tids:
             req = self.tids[badTID]
             del(self.tids[badTID])
         if badTID in self.tids:
             req = self.tids[badTID]
             del(self.tids[badTID])
@@ -507,7 +568,7 @@ class KRPC:
             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
         
     def stop(self):
             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
         
     def stop(self):
-        """Timeout all pending requests."""
+        """Cancel all pending requests."""
         for req in self.tids.values():
             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
                                   'connection has been stopped while waiting for response'))
         for req in self.tids.values():
             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
                                   'connection has been stopped while waiting for response'))
@@ -533,7 +594,8 @@ class Receiver(protocol.Factory):
 def make(port):
     from stats import StatsLogger
     af = Receiver()
 def make(port):
     from stats import StatsLogger
     af = Receiver()
-    a = hostbroker(af, StatsLogger(None, None, {}), {'SPEW': False})
+    a = hostbroker(af, StatsLogger(None, None),
+                   {'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
     a.protocol = KRPC
     p = reactor.listenUDP(port, a)
     return af, a, p
     a.protocol = KRPC
     p = reactor.listenUDP(port, a)
     return af, a, p