conn.stop()
protocol.DatagramProtocol.stopProtocol(self)
+class KrpcRequest(Deferred):
+ """An outstanding request to a remote node.
+
+
+ """
+
+ def __init__(self, protocol, newTID, method, data, initDelay):
+ Deferred.__init__(self)
+ self.protocol = protocol
+ self.tid = newTID
+ self.method = method
+ self.data = data
+ self.delay = initDelay
+ self.later = None
+
+ def send(self):
+ self.later = reactor.callLater(self.delay, self.timeOut)
+ self.delay *= 2
+ self.protocol._sendData(self.method, self.data)
+
+ def timeOut(self):
+ """Call the deferred's errback if a timeout occurs."""
+ self.protocol._timeOut(self.tid, self.method)
+
+ def callback(self, resp):
+ self.dropTimeOut()
+ Deferred.callback(self, resp)
+
+ def errback(self, resp):
+ self.dropTimeOut()
+ Deferred.errback(self, resp)
+
+ def dropTimeOut(self):
+ """Cancel the timeout call when a response is received."""
+ if self.later and self.later.active():
+ self.later.cancel()
+
class KRPC:
"""The KRPC protocol implementation.
elif msg[TYP] == RSP:
# Responses get processed by their TID's deferred
if self.tids.has_key(msg[TID]):
- df = self.tids[msg[TID]]
+ req = self.tids[msg[TID]]
# callback
del(self.tids[msg[TID]])
msg[RSP]['_krpc_sender'] = addr
- df.callback(msg[RSP])
+ req.callback(msg[RSP])
else:
# no tid, this transaction timed out already...
if self.noisy:
elif msg[TYP] == ERR:
# Errors get processed by their TID's deferred's errback
if self.tids.has_key(msg[TID]):
- df = self.tids[msg[TID]]
+ req = self.tids[msg[TID]]
del(self.tids[msg[TID]])
# callback
- df.errback(KrpcError(*msg[ERR]))
+ req.errback(KrpcError(*msg[ERR]))
else:
# day late and dollar short, just log it
log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
if self.noisy:
log.msg("unknown message type: %r" % msg)
if msg[TID] in self.tids:
- df = self.tids[msg[TID]]
+ req = self.tids[msg[TID]]
del(self.tids[msg[TID]])
# callback
- df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
- "Received an unknown message type: %r" % msg[TYP]))
+ req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
+ "Received an unknown message type: %r" % msg[TYP]))
def _sendResponse(self, request, addr, tid, msgType, response):
"""Helper function for sending responses to nodes.
"""Send a request to the remote node.
@type method: C{string}
- @param method: the methiod name to call on the remote node
+ @param method: the method name to call on the remote node
@param args: the arguments to send to the remote node's method
"""
if self.stopped:
raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
# Create the request message
- msg = {TID : newID(), TYP : REQ, REQ : method, ARG : args}
+ newTID = newID()
+ msg = {TID : newTID, TYP : REQ, REQ : method, ARG : args}
if self.noisy:
log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
data = bencode(msg)
# Create the deferred and save it with the TID
- d = Deferred()
- self.tids[msg[TID]] = d
+ req = KrpcRequest(self, newTID, method, data, 10)
+ self.tids[newTID] = req
# Save the conclusion of the action
- d.addCallbacks(self.stats.responseAction, self.stats.failedAction,
- callbackArgs = (method, ), errbackArgs = (method, ))
-
- # Schedule a later timeout call
- def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr):
- """Call the deferred's errback if a timeout occurs."""
- if tids.has_key(id):
- df = tids[id]
- del(tids[id])
- df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr)))
- later = reactor.callLater(KRPC_TIMEOUT, timeOut)
-
- # Cancel the timeout call if a response is received
- def dropTimeOut(dict, later_call = later):
- """Cancel the timeout call when a response is received."""
- if later_call.active():
- later_call.cancel()
- return dict
- d.addBoth(dropTimeOut)
+ req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
+ callbackArgs = (method, ), errbackArgs = (method, ))
+ req.send()
+ return req
+
+ def _sendData(self, method, data):
# Save some stats
self.stats.sentAction(method)
self.stats.sentBytes(len(data))
self.transport.write(data, self.addr)
- return d
-
+
+ def _timeOut(self, badTID, method):
+ """Call the deferred's errback if a timeout occurs."""
+ if badTID in self.tids:
+ req = self.tids[badTID]
+ del(self.tids[badTID])
+ self.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" %
+ (method, self.addr)))
+
def stop(self):
"""Timeout all pending requests."""
- for df in self.tids.values():
- df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
- 'connection has been stopped while waiting for response'))
+ for req in self.tids.values():
+ req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+ 'connection has been stopped while waiting for response'))
self.tids = {}
self.stopped = True