From: Cameron Dale Date: Wed, 9 Jan 2008 03:08:21 +0000 (-0800) Subject: Improve the stopping of the krpc protocol so no timeouts are left. X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=60d34d6f90d03b5c8c714e11c487c152a4aed98e;p=quix0rs-apt-p2p.git Improve the stopping of the krpc protocol so no timeouts are left. --- diff --git a/apt_dht_Khashmir/krpc.py b/apt_dht_Khashmir/krpc.py index 8972c6c..2c1f7e8 100644 --- a/apt_dht_Khashmir/krpc.py +++ b/apt_dht_Khashmir/krpc.py @@ -25,6 +25,9 @@ TYP = 'y' ARG = 'a' ERR = 'e' +class ProtocolError(Exception): + pass + class hostbroker(protocol.DatagramProtocol): def __init__(self, server): self.server = server @@ -53,6 +56,11 @@ class hostbroker(protocol.DatagramProtocol): protocol.DatagramProtocol.makeConnection(self, transport) tup = transport.getHost() self.addr = (tup.host, tup.port) + + def stopProtocol(self): + for conn in self.connections.values(): + conn.stop() + protocol.DatagramProtocol.stopProtocol(self) ## connection class KRPC: @@ -63,8 +71,12 @@ class KRPC: self.addr = addr self.tids = {} self.mtid = 0 + self.stopped = False def datagramReceived(self, str, addr): + if self.stopped: + if self.noisy: + print "stopped, dropping message from", addr, str # bdecode try: msg = bdecode(str) @@ -140,6 +152,8 @@ class KRPC: del(self.tids[msg[TID]]) def sendRequest(self, method, args): + if self.stopped: + raise ProtocolError, "connection has been stopped" # make message # send it msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args} @@ -147,12 +161,12 @@ class KRPC: str = bencode(msg) d = Deferred() self.tids[msg[TID]] = d - def timeOut(tids = self.tids, id = msg[TID]): + def timeOut(tids = self.tids, id = msg[TID], msg = msg): if tids.has_key(id): df = tids[id] del(tids[id]) print ">>>>>> KRPC_ERROR_TIMEOUT" - df.errback(KRPC_ERROR_TIMEOUT) + df.errback(ProtocolError('timeout waiting for %r' % msg)) later = reactor.callLater(KRPC_TIMEOUT, timeOut) def dropTimeOut(dict, later_call = later): if later_call.active(): @@ -161,6 +175,13 @@ class KRPC: d.addBoth(dropTimeOut) self.transport.write(str, self.addr) return d + + def stop(self): + """Timeout all pending requests.""" + for df in self.tids.values(): + df.errback(ProtocolError('connection has been closed')) + self.tids = {} + self.stopped = True def connectionForAddr(host, port): return host