-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
"""The KRPC communication protocol implementation.
@ivar delay: the last timeout delay sent
@type start: C{datetime}
@ivar start: the time to request was started at
- @type later: L{twisted.internet.interfaces.IDelayedCall}
- @ivar later: the pending call to timeout the last sent request
+ @type laterNextTimeout: L{twisted.internet.interfaces.IDelayedCall}
+ @ivar laterNextTimeout: the pending call to timeout the last sent request
+ @type laterFinalTimeout: L{twisted.internet.interfaces.IDelayedCall}
+ @ivar laterFinalTimeout: the pending call to timeout the entire request
"""
def __init__(self, protocol, newTID, method, data, config):
self.config = config
self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
self.start = datetime.now()
- self.later = None
- self.send()
+ self.laterNextTimeout = None
+ self.laterFinalTimeout = reactor.callLater(self.config.get('KRPC_TIMEOUT', 9), self.finalTimeout)
+ reactor.callLater(0, self.send)
def send(self):
"""Send the request to the remote node."""
- assert not self.later, 'There is already a pending request'
- self.later = reactor.callLater(self.delay, self.timeOut)
- self.protocol.sendData(self.method, self.data)
+ assert not self.laterNextTimeout, 'There is already a pending request'
+ self.laterNextTimeout = reactor.callLater(self.delay, self.nextTimeout)
+ try:
+ self.protocol.sendData(self.method, self.data)
+ except:
+ log.err()
- def timeOut(self):
+ def nextTimeout(self):
"""Check for a unrecoverable timeout, otherwise resend."""
- self.later = None
- delay = datetime.now() - self.start
- if delay > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 14)):
- log.msg('%r timed out after %0.2f sec' %
- (self.tid, delay.seconds + delay.microseconds/1000000.0))
- self.protocol.timeOut(self.tid, self.method)
+ self.laterNextTimeout = None
+ if datetime.now() - self.start > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 9)):
+ self.finalTimeout()
elif self.protocol.stopped:
log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
else:
self.delay *= 2
log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
- self.send()
+ reactor.callLater(0, self.send)
+
+ def finalTimeout(self):
+ """Timeout the request after an unrecoverable timeout."""
+ self.dropTimeOut()
+ delay = datetime.now() - self.start
+ log.msg('%r timed out after %0.2f sec' %
+ (self.tid, delay.seconds + delay.microseconds/1000000.0))
+ self.protocol.timeOut(self.tid, self.method)
def callback(self, resp):
self.dropTimeOut()
def dropTimeOut(self):
"""Cancel the timeout call when a response is received."""
- if self.later and self.later.active():
- self.later.cancel()
- self.later = None
+ if self.laterFinalTimeout and self.laterFinalTimeout.active():
+ self.laterFinalTimeout.cancel()
+ self.laterFinalTimeout = None
+ if self.laterNextTimeout and self.laterNextTimeout.active():
+ self.laterNextTimeout.cancel()
+ self.laterNextTimeout = None
class KRPC:
"""The KRPC protocol implementation.
ret = f(*(), **msg[ARG])
except KrpcError, e:
log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
- log.err(e)
+ if e[0] != KRPC_ERROR_INVALID_TOKEN:
+ log.err(e)
self.stats.errorAction(msg[REQ])
olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
[e[0], e[1]])
@type data: C{string}
@param data: the message to send to the remote node
"""
+ self.transport.write(data, self.addr)
self.stats.sentAction(method)
self.stats.sentBytes(len(data))
- self.transport.write(data, self.addr)
-
def timeOut(self, badTID, method):
"""Call the deferred's errback if a timeout occurs.
from stats import StatsLogger
af = Receiver()
a = hostbroker(af, StatsLogger(None, None),
- {'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
+ {'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
a.protocol = KRPC
p = reactor.listenUDP(port, a)
return af, a, p