"""The KRPC communication protocol implementation.
+@var KRPC_INITIAL_DELAY: the number of seconds after which to try resending
+ the request, the resends will wait twice as long each time
@var KRPC_TIMEOUT: the number of seconds after which requests timeout
@var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
UDP packet without fragmentation
"""
from bencode import bencode, bdecode
-from time import asctime
+from datetime import datetime, timedelta
from math import ceil
-from twisted.internet.defer import Deferred
+from twisted.internet import defer
from twisted.internet import protocol, reactor
from twisted.python import log
from twisted.trial import unittest
from khash import newID
-KRPC_TIMEOUT = 20
+KRPC_INITIAL_DELAY = 2
+KRPC_TIMEOUT = 14
UDP_PACKET_LIMIT = 1472
# Remote node errors
conn.stop()
protocol.DatagramProtocol.stopProtocol(self)
+class KrpcRequest(defer.Deferred):
+ """An outstanding request to a remote node.
+
+ @type protocol: L{KRPC}
+ @ivar protocol: the protocol to send data with
+ @ivar tid: the transaction ID of the request
+ @type method: C{string}
+ @ivar method: the name of the method to call on the remote node
+ @type data: C{string}
+ @ivar data: the message to send to the remote node
+ @type delay: C{int}
+ @ivar delay: the last timeout delay sent
+ @type start: C{datetime}
+ @ivar start: the time to request was started at
+ @type later: L{twisted.internet.interfaces.IDelayedCall}
+ @ivar later: the pending call to timeout the last sent request
+ """
+
+ def __init__(self, protocol, newTID, method, data):
+ """Initialize the request, and send it out.
+
+ @type protocol: L{KRPC}
+ @param protocol: the protocol to send data with
+ @param newTID: the transaction ID of the request
+ @type method: C{string}
+ @param method: the name of the method to call on the remote node
+ @type data: C{string}
+ @param data: the message to send to the remote node
+ """
+ defer.Deferred.__init__(self)
+ self.protocol = protocol
+ self.tid = newTID
+ self.method = method
+ self.data = data
+ self.delay = KRPC_INITIAL_DELAY
+ self.start = datetime.now()
+ self.later = None
+ self.send()
+
+ def send(self):
+ """Send the request to the remote node."""
+ assert not self.later, 'There is already a pending request'
+ self.later = reactor.callLater(self.delay, self.timeOut)
+ self.protocol.sendData(self.method, self.data)
+
+ def timeOut(self):
+ """Check for a unrecoverable timeout, otherwise resend."""
+ self.later = None
+ delay = datetime.now() - self.start
+ if delay > timedelta(seconds = KRPC_TIMEOUT):
+ log.msg('%r timed out after %0.2f sec' %
+ (self.tid, delay.seconds + delay.microseconds/1000000.0))
+ self.protocol.timeOut(self.tid, self.method)
+ elif self.protocol.stopped:
+ log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
+ else:
+ self.delay *= 2
+ log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
+ self.send()
+
+ def callback(self, resp):
+ self.dropTimeOut()
+ defer.Deferred.callback(self, resp)
+
+ def errback(self, resp):
+ self.dropTimeOut()
+ defer.Deferred.errback(self, resp)
+
+ def dropTimeOut(self):
+ """Cancel the timeout call when a response is received."""
+ if self.later and self.later.active():
+ self.later.cancel()
+ self.later = None
+
class KRPC:
"""The KRPC protocol implementation.
elif msg[TYP] == RSP:
# Responses get processed by their TID's deferred
if self.tids.has_key(msg[TID]):
- df = self.tids[msg[TID]]
+ req = self.tids[msg[TID]]
# callback
del(self.tids[msg[TID]])
msg[RSP]['_krpc_sender'] = addr
- df.callback(msg[RSP])
+ req.callback(msg[RSP])
else:
- # no tid, this transaction timed out already...
+ # no tid, this transaction was finished already...
if self.noisy:
- log.msg('timeout: %r' % msg[RSP]['id'])
+ log.msg('received response from %r for completed request: %r' %
+ (msg[RSP]['id'], msg[TID]))
elif msg[TYP] == ERR:
# Errors get processed by their TID's deferred's errback
if self.tids.has_key(msg[TID]):
- df = self.tids[msg[TID]]
+ req = self.tids[msg[TID]]
del(self.tids[msg[TID]])
# callback
- df.errback(KrpcError(*msg[ERR]))
+ req.errback(KrpcError(*msg[ERR]))
else:
- # day late and dollar short, just log it
- log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
- pass
+ # no tid, this transaction was finished already...
+ log.msg('received an error %r from %r for completed request: %r' %
+ (msg[ERR], msg[RSP]['id'], msg[TID]))
else:
# Received an unknown message type
if self.noisy:
log.msg("unknown message type: %r" % msg)
if msg[TID] in self.tids:
- df = self.tids[msg[TID]]
+ req = self.tids[msg[TID]]
del(self.tids[msg[TID]])
# callback
- df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
- "Received an unknown message type: %r" % msg[TYP]))
+ req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
+ "Received an unknown message type: %r" % msg[TYP]))
def _sendResponse(self, request, addr, tid, msgType, response):
"""Helper function for sending responses to nodes.
"""Send a request to the remote node.
@type method: C{string}
- @param method: the methiod name to call on the remote node
+ @param method: the method name to call on the remote node
@param args: the arguments to send to the remote node's method
"""
if self.stopped:
- raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
+ return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+ "cannot send, connection has been stopped"))
# Create the request message
- msg = {TID : newID(), TYP : REQ, REQ : method, ARG : args}
+ newTID = newID()
+ msg = {TID : newTID, TYP : REQ, REQ : method, ARG : args}
if self.noisy:
log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
data = bencode(msg)
- # Create the deferred and save it with the TID
- d = Deferred()
- self.tids[msg[TID]] = d
+ # Create the request object and save it with the TID
+ req = KrpcRequest(self, newTID, method, data)
+ self.tids[newTID] = req
# Save the conclusion of the action
- d.addCallbacks(self.stats.responseAction, self.stats.failedAction,
- callbackArgs = (method, ), errbackArgs = (method, ))
-
- # Schedule a later timeout call
- def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr):
- """Call the deferred's errback if a timeout occurs."""
- if tids.has_key(id):
- df = tids[id]
- del(tids[id])
- df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr)))
- later = reactor.callLater(KRPC_TIMEOUT, timeOut)
+ req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
+ callbackArgs = (method, ), errbackArgs = (method, ))
+
+ return req
+
+ def sendData(self, method, data):
+ """Write a request to the transport and save the stats.
- # Cancel the timeout call if a response is received
- def dropTimeOut(dict, later_call = later):
- """Cancel the timeout call when a response is received."""
- if later_call.active():
- later_call.cancel()
- return dict
- d.addBoth(dropTimeOut)
-
- # Save some stats
+ @type method: C{string}
+ @param method: the name of the method to call on the remote node
+ @type data: C{string}
+ @param data: the message to send to the remote node
+ """
self.stats.sentAction(method)
self.stats.sentBytes(len(data))
self.transport.write(data, self.addr)
- return d
-
+
+ def timeOut(self, badTID, method):
+ """Call the deferred's errback if a timeout occurs.
+
+ @param badTID: the transaction ID of the request
+ @type method: C{string}
+ @param method: the name of the method that timed out on the remote node
+ """
+ if badTID in self.tids:
+ req = self.tids[badTID]
+ del(self.tids[badTID])
+ req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" %
+ (method, self.addr)))
+ else:
+ log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
+
def stop(self):
- """Timeout all pending requests."""
- for df in self.tids.values():
- df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
- 'connection has been stopped while waiting for response'))
+ """Cancel all pending requests."""
+ for req in self.tids.values():
+ req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+ 'connection has been stopped while waiting for response'))
self.tids = {}
self.stopped = True
def make(port):
from stats import StatsLogger
af = Receiver()
- a = hostbroker(af, StatsLogger(None, None, {}), {'SPEW': False})
+ a = hostbroker(af, StatsLogger(None, None), {'SPEW': False})
a.protocol = KRPC
p = reactor.listenUDP(port, a)
return af, a, p
def testUnknownMeth(self):
df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
+ df = self.failUnlessFailure(df, KrpcError)
df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
return df
def testMalformedRequest(self):
df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
- df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST)
+ df = self.failUnlessFailure(df, KrpcError)
+ df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
return df
- def gotErr(self, err, should_be):
- self.failUnlessEqual(err.value[0], should_be)
+ def gotErr(self, value, should_be, *errorTypes):
+ self.failUnlessEqual(value[0], should_be)
+ if errorTypes:
+ self.flushLoggedErrors(*errorTypes)
def testLongPackets(self):
df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})