"""The KRPC communication protocol implementation.
+@var KRPC_INITIAL_DELAY: the number of seconds after which to try resending
+ the request, the resends will wait twice as long each time
@var KRPC_TIMEOUT: the number of seconds after which requests timeout
@var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
UDP packet without fragmentation
"""
from bencode import bencode, bdecode
-from time import asctime
+from datetime import datetime, timedelta
from math import ceil
-from twisted.internet.defer import Deferred
+from twisted.internet import defer
from twisted.internet import protocol, reactor
from twisted.python import log
from twisted.trial import unittest
from khash import newID
-KRPC_TIMEOUT = 20
+KRPC_INITIAL_DELAY = 2
+KRPC_TIMEOUT = 14
UDP_PACKET_LIMIT = 1472
# Remote node errors
conn.stop()
protocol.DatagramProtocol.stopProtocol(self)
-class KrpcRequest(Deferred):
+class KrpcRequest(defer.Deferred):
"""An outstanding request to a remote node.
-
+ @type protocol: L{KRPC}
+ @ivar protocol: the protocol to send data with
+ @ivar tid: the transaction ID of the request
+ @type method: C{string}
+ @ivar method: the name of the method to call on the remote node
+ @type data: C{string}
+ @ivar data: the message to send to the remote node
+ @type delay: C{int}
+ @ivar delay: the last timeout delay sent
+ @type start: C{datetime}
+ @ivar start: the time to request was started at
+ @type later: L{twisted.internet.interfaces.IDelayedCall}
+ @ivar later: the pending call to timeout the last sent request
"""
- def __init__(self, protocol, newTID, method, data, initDelay):
- Deferred.__init__(self)
+ def __init__(self, protocol, newTID, method, data):
+ """Initialize the request, and send it out.
+
+ @type protocol: L{KRPC}
+ @param protocol: the protocol to send data with
+ @param newTID: the transaction ID of the request
+ @type method: C{string}
+ @param method: the name of the method to call on the remote node
+ @type data: C{string}
+ @param data: the message to send to the remote node
+ """
+ defer.Deferred.__init__(self)
self.protocol = protocol
self.tid = newTID
self.method = method
self.data = data
- self.delay = initDelay
+ self.delay = KRPC_INITIAL_DELAY
+ self.start = datetime.now()
self.later = None
+ self.send()
def send(self):
+ """Send the request to the remote node."""
assert not self.later, 'There is already a pending request'
self.later = reactor.callLater(self.delay, self.timeOut)
- self.delay *= 2
self.protocol.sendData(self.method, self.data)
def timeOut(self):
- """Call the deferred's errback if a timeout occurs."""
+ """Check for a unrecoverable timeout, otherwise resend."""
self.later = None
- self.protocol.timeOut(self.tid, self.method)
+ delay = datetime.now() - self.start
+ if delay > timedelta(seconds = KRPC_TIMEOUT):
+ log.msg('%r timed out after %0.2f sec' %
+ (self.tid, delay.seconds + delay.microseconds/1000000.0))
+ self.protocol.timeOut(self.tid, self.method)
+ elif self.protocol.stopped:
+ log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
+ else:
+ self.delay *= 2
+ log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
+ self.send()
def callback(self, resp):
self.dropTimeOut()
- Deferred.callback(self, resp)
+ defer.Deferred.callback(self, resp)
def errback(self, resp):
self.dropTimeOut()
- Deferred.errback(self, resp)
+ defer.Deferred.errback(self, resp)
def dropTimeOut(self):
"""Cancel the timeout call when a response is received."""
if self.later and self.later.active():
self.later.cancel()
- self.later = None
+ self.later = None
class KRPC:
"""The KRPC protocol implementation.
msg[RSP]['_krpc_sender'] = addr
req.callback(msg[RSP])
else:
- # no tid, this transaction timed out already...
+ # no tid, this transaction was finished already...
if self.noisy:
- log.msg('timeout: %r' % msg[RSP]['id'])
+ log.msg('received response from %r for completed request: %r' %
+ (msg[RSP]['id'], msg[TID]))
elif msg[TYP] == ERR:
# Errors get processed by their TID's deferred's errback
if self.tids.has_key(msg[TID]):
# callback
req.errback(KrpcError(*msg[ERR]))
else:
- # day late and dollar short, just log it
- log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
- pass
+ # no tid, this transaction was finished already...
+ log.msg('received an error %r from %r for completed request: %r' %
+ (msg[ERR], msg[RSP]['id'], msg[TID]))
else:
# Received an unknown message type
if self.noisy:
@param args: the arguments to send to the remote node's method
"""
if self.stopped:
- raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
+ return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+ "cannot send, connection has been stopped"))
# Create the request message
newTID = newID()
log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
data = bencode(msg)
- # Create the deferred and save it with the TID
- req = KrpcRequest(self, newTID, method, data, 10)
+ # Create the request object and save it with the TID
+ req = KrpcRequest(self, newTID, method, data)
self.tids[newTID] = req
# Save the conclusion of the action
req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
callbackArgs = (method, ), errbackArgs = (method, ))
- req.send()
return req
def sendData(self, method, data):
- # Save some stats
+ """Write a request to the transport and save the stats.
+
+ @type method: C{string}
+ @param method: the name of the method to call on the remote node
+ @type data: C{string}
+ @param data: the message to send to the remote node
+ """
self.stats.sentAction(method)
self.stats.sentBytes(len(data))
self.transport.write(data, self.addr)
def timeOut(self, badTID, method):
- """Call the deferred's errback if a timeout occurs."""
+ """Call the deferred's errback if a timeout occurs.
+
+ @param badTID: the transaction ID of the request
+ @type method: C{string}
+ @param method: the name of the method that timed out on the remote node
+ """
if badTID in self.tids:
req = self.tids[badTID]
del(self.tids[badTID])
log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
def stop(self):
- """Timeout all pending requests."""
+ """Cancel all pending requests."""
for req in self.tids.values():
req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
'connection has been stopped while waiting for response'))