+class KrpcRequest(defer.Deferred):
+ """An outstanding request to a remote node.
+
+ @type protocol: L{KRPC}
+ @ivar protocol: the protocol to send data with
+ @ivar tid: the transaction ID of the request
+ @type method: C{string}
+ @ivar method: the name of the method to call on the remote node
+ @type data: C{string}
+ @ivar data: the message to send to the remote node
+ @type config: C{dictionary}
+ @ivar config: the configuration parameters for the DHT
+ @type delay: C{int}
+ @ivar delay: the last timeout delay sent
+ @type start: C{datetime}
+ @ivar start: the time to request was started at
+ @type laterNextTimeout: L{twisted.internet.interfaces.IDelayedCall}
+ @ivar laterNextTimeout: the pending call to timeout the last sent request
+ @type laterFinalTimeout: L{twisted.internet.interfaces.IDelayedCall}
+ @ivar laterFinalTimeout: the pending call to timeout the entire request
+ """
+
+ def __init__(self, protocol, newTID, method, data, config):
+ """Initialize the request, and send it out.
+
+ @type protocol: L{KRPC}
+ @param protocol: the protocol to send data with
+ @param newTID: the transaction ID of the request
+ @type method: C{string}
+ @param method: the name of the method to call on the remote node
+ @type data: C{string}
+ @param data: the message to send to the remote node
+ @type config: C{dictionary}
+ @param config: the configuration parameters for the DHT
+ """
+ defer.Deferred.__init__(self)
+ self.protocol = protocol
+ self.tid = newTID
+ self.method = method
+ self.data = data
+ self.config = config
+ self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
+ self.start = datetime.now()
+ self.laterNextTimeout = None
+ self.laterFinalTimeout = reactor.callLater(self.config.get('KRPC_TIMEOUT', 9), self.finalTimeout)
+ reactor.callLater(0, self.send)
+
+ def send(self):
+ """Send the request to the remote node."""
+ assert not self.laterNextTimeout, 'There is already a pending request'
+ self.laterNextTimeout = reactor.callLater(self.delay, self.nextTimeout)
+ try:
+ self.protocol.sendData(self.method, self.data)
+ except:
+ log.err()
+
+ def nextTimeout(self):
+ """Check for a unrecoverable timeout, otherwise resend."""
+ self.laterNextTimeout = None
+ if datetime.now() - self.start > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 9)):
+ self.finalTimeout()
+ elif self.protocol.stopped:
+ log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
+ else:
+ self.delay *= 2
+ log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
+ reactor.callLater(0, self.send)
+
+ def finalTimeout(self):
+ """Timeout the request after an unrecoverable timeout."""
+ self.dropTimeOut()
+ delay = datetime.now() - self.start
+ log.msg('%r timed out after %0.2f sec' %
+ (self.tid, delay.seconds + delay.microseconds/1000000.0))
+ self.protocol.timeOut(self.tid, self.method)
+
+ def callback(self, resp):
+ self.dropTimeOut()
+ defer.Deferred.callback(self, resp)
+
+ def errback(self, resp):
+ self.dropTimeOut()
+ defer.Deferred.errback(self, resp)
+
+ def dropTimeOut(self):
+ """Cancel the timeout call when a response is received."""
+ if self.laterFinalTimeout and self.laterFinalTimeout.active():
+ self.laterFinalTimeout.cancel()
+ self.laterFinalTimeout = None
+ if self.laterNextTimeout and self.laterNextTimeout.active():
+ self.laterNextTimeout.cancel()
+ self.laterNextTimeout = None
+