Make sure full timeouts occur even if a resend timeout has not.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
index 3a2e5b310005c5ddd1b0d0d853afae3a43b59884..b151a5527f15b6f210f14e83b565a790cfe750b7 100644 (file)
@@ -1,5 +1,3 @@
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
 
 """The KRPC communication protocol implementation.
 
@@ -214,8 +212,10 @@ class KrpcRequest(defer.Deferred):
     @ivar delay: the last timeout delay sent
     @type start: C{datetime}
     @ivar start: the time to request was started at
-    @type later: L{twisted.internet.interfaces.IDelayedCall}
-    @ivar later: the pending call to timeout the last sent request
+    @type laterNextTimeout: L{twisted.internet.interfaces.IDelayedCall}
+    @ivar laterNextTimeout: the pending call to timeout the last sent request
+    @type laterFinalTimeout: L{twisted.internet.interfaces.IDelayedCall}
+    @ivar laterFinalTimeout: the pending call to timeout the entire request
     """
     
     def __init__(self, protocol, newTID, method, data, config):
@@ -239,26 +239,24 @@ class KrpcRequest(defer.Deferred):
         self.config = config
         self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
         self.start = datetime.now()
-        self.later = None
+        self.laterNextTimeout = None
+        self.laterFinalTimeout = reactor.callLater(self.config.get('KRPC_TIMEOUT', 9), self.finalTimeout)
         reactor.callLater(0, self.send)
         
     def send(self):
         """Send the request to the remote node."""
-        assert not self.later, 'There is already a pending request'
-        self.later = reactor.callLater(self.delay, self.timeOut)
+        assert not self.laterNextTimeout, 'There is already a pending request'
+        self.laterNextTimeout = reactor.callLater(self.delay, self.nextTimeout)
         try:
             self.protocol.sendData(self.method, self.data)
         except:
             log.err()
 
-    def timeOut(self):
+    def nextTimeout(self):
         """Check for a unrecoverable timeout, otherwise resend."""
-        self.later = None
-        delay = datetime.now() - self.start
-        if delay > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 14)):
-            log.msg('%r timed out after %0.2f sec' %
-                    (self.tid, delay.seconds + delay.microseconds/1000000.0))
-            self.protocol.timeOut(self.tid, self.method)
+        self.laterNextTimeout = None
+        if datetime.now() - self.start > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 9)):
+            self.finalTimeout()
         elif self.protocol.stopped:
             log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
         else:
@@ -266,6 +264,14 @@ class KrpcRequest(defer.Deferred):
             log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
             reactor.callLater(0, self.send)
         
+    def finalTimeout(self):
+        """Timeout the request after an unrecoverable timeout."""
+        self.dropTimeOut()
+        delay = datetime.now() - self.start
+        log.msg('%r timed out after %0.2f sec' %
+                (self.tid, delay.seconds + delay.microseconds/1000000.0))
+        self.protocol.timeOut(self.tid, self.method)
+        
     def callback(self, resp):
         self.dropTimeOut()
         defer.Deferred.callback(self, resp)
@@ -276,9 +282,12 @@ class KrpcRequest(defer.Deferred):
         
     def dropTimeOut(self):
         """Cancel the timeout call when a response is received."""
-        if self.later and self.later.active():
-            self.later.cancel()
-        self.later = None
+        if self.laterFinalTimeout and self.laterFinalTimeout.active():
+            self.laterFinalTimeout.cancel()
+        self.laterFinalTimeout = None
+        if self.laterNextTimeout and self.laterNextTimeout.active():
+            self.laterNextTimeout.cancel()
+        self.laterNextTimeout = None
 
 class KRPC:
     """The KRPC protocol implementation.
@@ -368,7 +377,8 @@ class KRPC:
                     ret = f(*(), **msg[ARG])
                 except KrpcError, e:
                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
-                    log.err(e)
+                    if e[0] != KRPC_ERROR_INVALID_TOKEN:
+                        log.err(e)
                     self.stats.errorAction(msg[REQ])
                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                               [e[0], e[1]])
@@ -585,7 +595,7 @@ def make(port):
     from stats import StatsLogger
     af = Receiver()
     a = hostbroker(af, StatsLogger(None, None),
-                   {'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
+                   {'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
     a.protocol = KRPC
     p = reactor.listenUDP(port, a)
     return af, a, p