From: Cameron Dale Date: Tue, 15 Apr 2008 00:28:13 +0000 (-0700) Subject: WIP on sending multiple KRPC requests before timeout. X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=commitdiff_plain;h=66a54d42206699aeab07c7e5d70609d25651e9eb WIP on sending multiple KRPC requests before timeout. --- diff --git a/apt_p2p_Khashmir/krpc.py b/apt_p2p_Khashmir/krpc.py index 47e7452..d4101ad 100644 --- a/apt_p2p_Khashmir/krpc.py +++ b/apt_p2p_Khashmir/krpc.py @@ -200,6 +200,43 @@ class hostbroker(protocol.DatagramProtocol): conn.stop() protocol.DatagramProtocol.stopProtocol(self) +class KrpcRequest(Deferred): + """An outstanding request to a remote node. + + + """ + + def __init__(self, protocol, newTID, method, data, initDelay): + Deferred.__init__(self) + self.protocol = protocol + self.tid = newTID + self.method = method + self.data = data + self.delay = initDelay + self.later = None + + def send(self): + self.later = reactor.callLater(self.delay, self.timeOut) + self.delay *= 2 + self.protocol._sendData(self.method, self.data) + + def timeOut(self): + """Call the deferred's errback if a timeout occurs.""" + self.protocol._timeOut(self.tid, self.method) + + def callback(self, resp): + self.dropTimeOut() + Deferred.callback(self, resp) + + def errback(self, resp): + self.dropTimeOut() + Deferred.errback(self, resp) + + def dropTimeOut(self): + """Cancel the timeout call when a response is received.""" + if self.later and self.later.active(): + self.later.cancel() + class KRPC: """The KRPC protocol implementation. @@ -318,11 +355,11 @@ class KRPC: elif msg[TYP] == RSP: # Responses get processed by their TID's deferred if self.tids.has_key(msg[TID]): - df = self.tids[msg[TID]] + req = self.tids[msg[TID]] # callback del(self.tids[msg[TID]]) msg[RSP]['_krpc_sender'] = addr - df.callback(msg[RSP]) + req.callback(msg[RSP]) else: # no tid, this transaction timed out already... if self.noisy: @@ -330,10 +367,10 @@ class KRPC: elif msg[TYP] == ERR: # Errors get processed by their TID's deferred's errback if self.tids.has_key(msg[TID]): - df = self.tids[msg[TID]] + req = self.tids[msg[TID]] del(self.tids[msg[TID]]) # callback - df.errback(KrpcError(*msg[ERR])) + req.errback(KrpcError(*msg[ERR])) else: # day late and dollar short, just log it log.msg("Got an error for an unknown request: %r" % (msg[ERR], )) @@ -343,11 +380,11 @@ class KRPC: if self.noisy: log.msg("unknown message type: %r" % msg) if msg[TID] in self.tids: - df = self.tids[msg[TID]] + req = self.tids[msg[TID]] del(self.tids[msg[TID]]) # callback - df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN, - "Received an unknown message type: %r" % msg[TYP])) + req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN, + "Received an unknown message type: %r" % msg[TYP])) def _sendResponse(self, request, addr, tid, msgType, response): """Helper function for sending responses to nodes. @@ -425,55 +462,50 @@ class KRPC: """Send a request to the remote node. @type method: C{string} - @param method: the methiod name to call on the remote node + @param method: the method name to call on the remote node @param args: the arguments to send to the remote node's method """ if self.stopped: raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped") # Create the request message - msg = {TID : newID(), TYP : REQ, REQ : method, ARG : args} + newTID = newID() + msg = {TID : newTID, TYP : REQ, REQ : method, ARG : args} if self.noisy: log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg)) data = bencode(msg) # Create the deferred and save it with the TID - d = Deferred() - self.tids[msg[TID]] = d + req = KrpcRequest(self, newTID, method, data, 10) + self.tids[newTID] = req # Save the conclusion of the action - d.addCallbacks(self.stats.responseAction, self.stats.failedAction, - callbackArgs = (method, ), errbackArgs = (method, )) - - # Schedule a later timeout call - def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr): - """Call the deferred's errback if a timeout occurs.""" - if tids.has_key(id): - df = tids[id] - del(tids[id]) - df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr))) - later = reactor.callLater(KRPC_TIMEOUT, timeOut) - - # Cancel the timeout call if a response is received - def dropTimeOut(dict, later_call = later): - """Cancel the timeout call when a response is received.""" - if later_call.active(): - later_call.cancel() - return dict - d.addBoth(dropTimeOut) + req.addCallbacks(self.stats.responseAction, self.stats.failedAction, + callbackArgs = (method, ), errbackArgs = (method, )) + req.send() + return req + + def _sendData(self, method, data): # Save some stats self.stats.sentAction(method) self.stats.sentBytes(len(data)) self.transport.write(data, self.addr) - return d - + + def _timeOut(self, badTID, method): + """Call the deferred's errback if a timeout occurs.""" + if badTID in self.tids: + req = self.tids[badTID] + del(self.tids[badTID]) + self.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % + (method, self.addr))) + def stop(self): """Timeout all pending requests.""" - for df in self.tids.values(): - df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, - 'connection has been stopped while waiting for response')) + for req in self.tids.values(): + req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, + 'connection has been stopped while waiting for response')) self.tids = {} self.stopped = True