Make sure full timeouts occur even if a resend timeout has not.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
index 47e7452f907e42baa755226e0b94db0abf7bd274..b151a5527f15b6f210f14e83b565a790cfe750b7 100644 (file)
@@ -1,9 +1,6 @@
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
 
 """The KRPC communication protocol implementation.
 
 
 """The KRPC communication protocol implementation.
 
-@var KRPC_TIMEOUT: the number of seconds after which requests timeout
 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
     UDP packet without fragmentation
 
 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
     UDP packet without fragmentation
 
 """
 
 from bencode import bencode, bdecode
 """
 
 from bencode import bencode, bdecode
-from time import asctime
+from datetime import datetime, timedelta
 from math import ceil
 
 from math import ceil
 
-from twisted.internet.defer import Deferred
+from twisted.internet import defer
 from twisted.internet import protocol, reactor
 from twisted.python import log
 from twisted.trial import unittest
 
 from khash import newID
 
 from twisted.internet import protocol, reactor
 from twisted.python import log
 from twisted.trial import unittest
 
 from khash import newID
 
-KRPC_TIMEOUT = 20
 UDP_PACKET_LIMIT = 1472
 
 # Remote node errors
 UDP_PACKET_LIMIT = 1472
 
 # Remote node errors
@@ -182,7 +178,7 @@ class hostbroker(protocol.DatagramProtocol):
         
         # Create a new protocol object if necessary
         if not self.connections.has_key(addr):
         
         # Create a new protocol object if necessary
         if not self.connections.has_key(addr):
-            conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW'])
+            conn = self.protocol(addr, self.server, self.stats, self.transport, self.config)
             self.connections[addr] = conn
         else:
             conn = self.connections[addr]
             self.connections[addr] = conn
         else:
             conn = self.connections[addr]
@@ -200,6 +196,99 @@ class hostbroker(protocol.DatagramProtocol):
             conn.stop()
         protocol.DatagramProtocol.stopProtocol(self)
 
             conn.stop()
         protocol.DatagramProtocol.stopProtocol(self)
 
+class KrpcRequest(defer.Deferred):
+    """An outstanding request to a remote node.
+    
+    @type protocol: L{KRPC}
+    @ivar protocol: the protocol to send data with
+    @ivar tid: the transaction ID of the request
+    @type method: C{string}
+    @ivar method: the name of the method to call on the remote node
+    @type data: C{string}
+    @ivar data: the message to send to the remote node
+    @type config: C{dictionary}
+    @ivar config: the configuration parameters for the DHT
+    @type delay: C{int}
+    @ivar delay: the last timeout delay sent
+    @type start: C{datetime}
+    @ivar start: the time to request was started at
+    @type laterNextTimeout: L{twisted.internet.interfaces.IDelayedCall}
+    @ivar laterNextTimeout: the pending call to timeout the last sent request
+    @type laterFinalTimeout: L{twisted.internet.interfaces.IDelayedCall}
+    @ivar laterFinalTimeout: the pending call to timeout the entire request
+    """
+    
+    def __init__(self, protocol, newTID, method, data, config):
+        """Initialize the request, and send it out.
+        
+        @type protocol: L{KRPC}
+        @param protocol: the protocol to send data with
+        @param newTID: the transaction ID of the request
+        @type method: C{string}
+        @param method: the name of the method to call on the remote node
+        @type data: C{string}
+        @param data: the message to send to the remote node
+        @type config: C{dictionary}
+        @param config: the configuration parameters for the DHT
+        """
+        defer.Deferred.__init__(self)
+        self.protocol = protocol
+        self.tid = newTID
+        self.method = method
+        self.data = data
+        self.config = config
+        self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
+        self.start = datetime.now()
+        self.laterNextTimeout = None
+        self.laterFinalTimeout = reactor.callLater(self.config.get('KRPC_TIMEOUT', 9), self.finalTimeout)
+        reactor.callLater(0, self.send)
+        
+    def send(self):
+        """Send the request to the remote node."""
+        assert not self.laterNextTimeout, 'There is already a pending request'
+        self.laterNextTimeout = reactor.callLater(self.delay, self.nextTimeout)
+        try:
+            self.protocol.sendData(self.method, self.data)
+        except:
+            log.err()
+
+    def nextTimeout(self):
+        """Check for a unrecoverable timeout, otherwise resend."""
+        self.laterNextTimeout = None
+        if datetime.now() - self.start > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 9)):
+            self.finalTimeout()
+        elif self.protocol.stopped:
+            log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
+        else:
+            self.delay *= 2
+            log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
+            reactor.callLater(0, self.send)
+        
+    def finalTimeout(self):
+        """Timeout the request after an unrecoverable timeout."""
+        self.dropTimeOut()
+        delay = datetime.now() - self.start
+        log.msg('%r timed out after %0.2f sec' %
+                (self.tid, delay.seconds + delay.microseconds/1000000.0))
+        self.protocol.timeOut(self.tid, self.method)
+        
+    def callback(self, resp):
+        self.dropTimeOut()
+        defer.Deferred.callback(self, resp)
+        
+    def errback(self, resp):
+        self.dropTimeOut()
+        defer.Deferred.errback(self, resp)
+        
+    def dropTimeOut(self):
+        """Cancel the timeout call when a response is received."""
+        if self.laterFinalTimeout and self.laterFinalTimeout.active():
+            self.laterFinalTimeout.cancel()
+        self.laterFinalTimeout = None
+        if self.laterNextTimeout and self.laterNextTimeout.active():
+            self.laterNextTimeout.cancel()
+        self.laterNextTimeout = None
+
 class KRPC:
     """The KRPC protocol implementation.
     
 class KRPC:
     """The KRPC protocol implementation.
     
@@ -210,8 +299,8 @@ class KRPC:
     @ivar stats: the statistics logger to save transport info
     @type addr: (C{string}, C{int})
     @ivar addr: the IP address and port of the source node
     @ivar stats: the statistics logger to save transport info
     @type addr: (C{string}, C{int})
     @ivar addr: the IP address and port of the source node
-    @type noisy: C{boolean}
-    @ivar noisy: whether to log additional details of the protocol
+    @type config: C{dictionary}
+    @ivar config: the configuration parameters for the DHT
     @type tids: C{dictionary}
     @ivar tids: the transaction IDs outstanding for requests, keys are the
         transaction ID of the request, values are the deferreds to call with
     @type tids: C{dictionary}
     @ivar tids: the transaction IDs outstanding for requests, keys are the
         transaction ID of the request, values are the deferreds to call with
@@ -220,7 +309,7 @@ class KRPC:
     @ivar stopped: whether the protocol has been stopped
     """
     
     @ivar stopped: whether the protocol has been stopped
     """
     
-    def __init__(self, addr, server, stats, transport, spew = False):
+    def __init__(self, addr, server, stats, transport, config = {}):
         """Initialize the protocol.
         
         @type addr: (C{string}, C{int})
         """Initialize the protocol.
         
         @type addr: (C{string}, C{int})
@@ -230,15 +319,15 @@ class KRPC:
         @type stats: L{stats.StatsLogger}
         @param stats: the statistics logger to save transport info
         @param transport: the transport to use for the protocol
         @type stats: L{stats.StatsLogger}
         @param stats: the statistics logger to save transport info
         @param transport: the transport to use for the protocol
-        @type spew: C{boolean}
-        @param spew: whether to log additional details of the protocol
-            (optional, defaults to False)
+        @type config: C{dictionary}
+        @param config: the configuration parameters for the DHT
+            (optional, defaults to using defaults)
         """
         self.transport = transport
         self.factory = server
         self.stats = stats
         self.addr = addr
         """
         self.transport = transport
         self.factory = server
         self.stats = stats
         self.addr = addr
-        self.noisy = spew
+        self.config = config
         self.tids = {}
         self.stopped = False
 
         self.tids = {}
         self.stopped = False
 
@@ -252,14 +341,14 @@ class KRPC:
         """
         self.stats.receivedBytes(len(data))
         if self.stopped:
         """
         self.stats.receivedBytes(len(data))
         if self.stopped:
-            if self.noisy:
+            if self.config.get('SPEW', False):
                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
 
         # Bdecode the message
         try:
             msg = bdecode(data)
         except Exception, e:
                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
 
         # Bdecode the message
         try:
             msg = bdecode(data)
         except Exception, e:
-            if self.noisy:
+            if self.config.get('SPEW', False):
                 log.msg("krpc bdecode error: ")
                 log.err(e)
             return
                 log.msg("krpc bdecode error: ")
                 log.err(e)
             return
@@ -272,7 +361,7 @@ class KRPC:
             log.err(e)
             return
 
             log.err(e)
             return
 
-        if self.noisy:
+        if self.config.get('SPEW', False):
             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
 
         # Process it based on its type
             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
 
         # Process it based on its type
@@ -288,7 +377,8 @@ class KRPC:
                     ret = f(*(), **msg[ARG])
                 except KrpcError, e:
                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
                     ret = f(*(), **msg[ARG])
                 except KrpcError, e:
                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
-                    log.err(e)
+                    if e[0] != KRPC_ERROR_INVALID_TOKEN:
+                        log.err(e)
                     self.stats.errorAction(msg[REQ])
                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                               [e[0], e[1]])
                     self.stats.errorAction(msg[REQ])
                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                               [e[0], e[1]])
@@ -312,42 +402,42 @@ class KRPC:
                 self.stats.receivedAction('unknown')
                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
                 self.stats.receivedAction('unknown')
                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
-            if self.noisy:
-                log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
-                                                  ilen, msg[REQ], olen))
+
+            log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen))
         elif msg[TYP] == RSP:
             # Responses get processed by their TID's deferred
             if self.tids.has_key(msg[TID]):
         elif msg[TYP] == RSP:
             # Responses get processed by their TID's deferred
             if self.tids.has_key(msg[TID]):
-                df = self.tids[msg[TID]]
+                req = self.tids[msg[TID]]
                 #      callback
                 del(self.tids[msg[TID]])
                 msg[RSP]['_krpc_sender'] = addr
                 #      callback
                 del(self.tids[msg[TID]])
                 msg[RSP]['_krpc_sender'] = addr
-                df.callback(msg[RSP])
+                req.callback(msg[RSP])
             else:
             else:
-                # no tid, this transaction timed out already...
-                if self.noisy:
-                    log.msg('timeout: %r' % msg[RSP]['id'])
+                # no tid, this transaction was finished already...
+                if self.config.get('SPEW', False):
+                    log.msg('received response from %r for completed request: %r' %
+                            (msg[RSP]['id'], msg[TID]))
         elif msg[TYP] == ERR:
             # Errors get processed by their TID's deferred's errback
             if self.tids.has_key(msg[TID]):
         elif msg[TYP] == ERR:
             # Errors get processed by their TID's deferred's errback
             if self.tids.has_key(msg[TID]):
-                df = self.tids[msg[TID]]
+                req = self.tids[msg[TID]]
                 del(self.tids[msg[TID]])
                 # callback
                 del(self.tids[msg[TID]])
                 # callback
-                df.errback(KrpcError(*msg[ERR]))
+                req.errback(KrpcError(*msg[ERR]))
             else:
             else:
-                # day late and dollar short, just log it
-                log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
-                pass
+                # no tid, this transaction was finished already...
+                log.msg('received an error %r from %r for completed request: %r' %
+                        (msg[ERR], msg[RSP]['id'], msg[TID]))
         else:
             # Received an unknown message type
         else:
             # Received an unknown message type
-            if self.noisy:
+            if self.config.get('SPEW', False):
                 log.msg("unknown message type: %r" % msg)
             if msg[TID] in self.tids:
                 log.msg("unknown message type: %r" % msg)
             if msg[TID] in self.tids:
-                df = self.tids[msg[TID]]
+                req = self.tids[msg[TID]]
                 del(self.tids[msg[TID]])
                 # callback
                 del(self.tids[msg[TID]])
                 # callback
-                df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
-                                     "Received an unknown message type: %r" % msg[TYP]))
+                req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
+                                      "Received an unknown message type: %r" % msg[TYP]))
                 
     def _sendResponse(self, request, addr, tid, msgType, response):
         """Helper function for sending responses to nodes.
                 
     def _sendResponse(self, request, addr, tid, msgType, response):
         """Helper function for sending responses to nodes.
@@ -366,7 +456,7 @@ class KRPC:
             # Create the response message
             msg = {TID : tid, TYP : msgType, msgType : response}
     
             # Create the response message
             msg = {TID : tid, TYP : msgType, msgType : response}
     
-            if self.noisy:
+            if self.config.get('SPEW', False):
                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
     
             out = bencode(msg)
                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
     
             out = bencode(msg)
@@ -425,55 +515,63 @@ class KRPC:
         """Send a request to the remote node.
         
         @type method: C{string}
         """Send a request to the remote node.
         
         @type method: C{string}
-        @param method: the methiod name to call on the remote node
+        @param method: the method name to call on the remote node
         @param args: the arguments to send to the remote node's method
         """
         if self.stopped:
         @param args: the arguments to send to the remote node's method
         """
         if self.stopped:
-            raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
+            return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+                                        "cannot send, connection has been stopped"))
 
         # Create the request message
 
         # Create the request message
-        msg = {TID : newID(), TYP : REQ,  REQ : method, ARG : args}
-        if self.noisy:
+        newTID = newID()
+        msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
+        if self.config.get('SPEW', False):
             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
         data = bencode(msg)
         
             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
         data = bencode(msg)
         
-        # Create the deferred and save it with the TID
-        d = Deferred()
-        self.tids[msg[TID]] = d
+        # Create the request object and save it with the TID
+        req = KrpcRequest(self, newTID, method, data, self.config)
+        self.tids[newTID] = req
         
         # Save the conclusion of the action
         
         # Save the conclusion of the action
-        d.addCallbacks(self.stats.responseAction, self.stats.failedAction,
-                       callbackArgs = (method, ), errbackArgs = (method, ))
-
-        # Schedule a later timeout call
-        def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr):
-            """Call the deferred's errback if a timeout occurs."""
-            if tids.has_key(id):
-                df = tids[id]
-                del(tids[id])
-                df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr)))
-        later = reactor.callLater(KRPC_TIMEOUT, timeOut)
+        req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
+                         callbackArgs = (method, datetime.now()),
+                         errbackArgs = (method, datetime.now()))
+
+        return req
+    
+    def sendData(self, method, data):
+        """Write a request to the transport and save the stats.
         
         
-        # Cancel the timeout call if a response is received
-        def dropTimeOut(dict, later_call = later):
-            """Cancel the timeout call when a response is received."""
-            if later_call.active():
-                later_call.cancel()
-            return dict
-        d.addBoth(dropTimeOut)
-
-        # Save some stats
+        @type method: C{string}
+        @param method: the name of the method to call on the remote node
+        @type data: C{string}
+        @param data: the message to send to the remote node
+        """
+        self.transport.write(data, self.addr)
         self.stats.sentAction(method)
         self.stats.sentBytes(len(data))
         
         self.stats.sentAction(method)
         self.stats.sentBytes(len(data))
         
-        self.transport.write(data, self.addr)
-        return d
-    
+    def timeOut(self, badTID, method):
+        """Call the deferred's errback if a timeout occurs.
+        
+        @param badTID: the transaction ID of the request
+        @type method: C{string}
+        @param method: the name of the method that timed out on the remote node
+        """
+        if badTID in self.tids:
+            req = self.tids[badTID]
+            del(self.tids[badTID])
+            req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % 
+                                                      (method, self.addr)))
+        else:
+            log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
+        
     def stop(self):
     def stop(self):
-        """Timeout all pending requests."""
-        for df in self.tids.values():
-            df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
-                                 'connection has been stopped while waiting for response'))
+        """Cancel all pending requests."""
+        for req in self.tids.values():
+            req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+                                  'connection has been stopped while waiting for response'))
         self.tids = {}
         self.stopped = True
 
         self.tids = {}
         self.stopped = True
 
@@ -496,7 +594,8 @@ class Receiver(protocol.Factory):
 def make(port):
     from stats import StatsLogger
     af = Receiver()
 def make(port):
     from stats import StatsLogger
     af = Receiver()
-    a = hostbroker(af, StatsLogger(None, None, {}), {'SPEW': False})
+    a = hostbroker(af, StatsLogger(None, None),
+                   {'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
     a.protocol = KRPC
     p = reactor.listenUDP(port, a)
     return af, a, p
     a.protocol = KRPC
     p = reactor.listenUDP(port, a)
     return af, a, p