ARG = 'a'
ERR = 'e'
+class ProtocolError(Exception):
+ pass
+
class hostbroker(protocol.DatagramProtocol):
def __init__(self, server):
self.server = server
protocol.DatagramProtocol.makeConnection(self, transport)
tup = transport.getHost()
self.addr = (tup.host, tup.port)
+
+ def stopProtocol(self):
+ for conn in self.connections.values():
+ conn.stop()
+ protocol.DatagramProtocol.stopProtocol(self)
## connection
class KRPC:
self.addr = addr
self.tids = {}
self.mtid = 0
+ self.stopped = False
def datagramReceived(self, str, addr):
+ if self.stopped:
+ if self.noisy:
+ print "stopped, dropping message from", addr, str
# bdecode
try:
msg = bdecode(str)
del(self.tids[msg[TID]])
def sendRequest(self, method, args):
+ if self.stopped:
+ raise ProtocolError, "connection has been stopped"
# make message
# send it
msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args}
str = bencode(msg)
d = Deferred()
self.tids[msg[TID]] = d
- def timeOut(tids = self.tids, id = msg[TID]):
+ def timeOut(tids = self.tids, id = msg[TID], msg = msg):
if tids.has_key(id):
df = tids[id]
del(tids[id])
print ">>>>>> KRPC_ERROR_TIMEOUT"
- df.errback(KRPC_ERROR_TIMEOUT)
+ df.errback(ProtocolError('timeout waiting for %r' % msg))
later = reactor.callLater(KRPC_TIMEOUT, timeOut)
def dropTimeOut(dict, later_call = later):
if later_call.active():
d.addBoth(dropTimeOut)
self.transport.write(str, self.addr)
return d
+
+ def stop(self):
+ """Timeout all pending requests."""
+ for df in self.tids.values():
+ df.errback(ProtocolError('connection has been closed'))
+ self.tids = {}
+ self.stopped = True
def connectionForAddr(host, port):
return host