]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
Improve the stopping of the krpc protocol so no timeouts are left.
authorCameron Dale <camrdale@gmail.com>
Wed, 9 Jan 2008 03:08:21 +0000 (19:08 -0800)
committerCameron Dale <camrdale@gmail.com>
Wed, 9 Jan 2008 03:08:21 +0000 (19:08 -0800)
apt_dht_Khashmir/krpc.py

index 8972c6c2102322acd36d4bf96a4cc7972ec1e838..2c1f7e83b972c1a4e8fb3cdfcacb64de4b87f2fe 100644 (file)
@@ -25,6 +25,9 @@ TYP = 'y'
 ARG = 'a'
 ERR = 'e'
 
+class ProtocolError(Exception):
+    pass
+
 class hostbroker(protocol.DatagramProtocol):       
     def __init__(self, server):
         self.server = server
@@ -53,6 +56,11 @@ class hostbroker(protocol.DatagramProtocol):
         protocol.DatagramProtocol.makeConnection(self, transport)
         tup = transport.getHost()
         self.addr = (tup.host, tup.port)
+        
+    def stopProtocol(self):
+        for conn in self.connections.values():
+            conn.stop()
+        protocol.DatagramProtocol.stopProtocol(self)
 
 ## connection
 class KRPC:
@@ -63,8 +71,12 @@ class KRPC:
         self.addr = addr
         self.tids = {}
         self.mtid = 0
+        self.stopped = False
 
     def datagramReceived(self, str, addr):
+        if self.stopped:
+            if self.noisy:
+                print "stopped, dropping message from", addr, str
         # bdecode
         try:
             msg = bdecode(str)
@@ -140,6 +152,8 @@ class KRPC:
                 del(self.tids[msg[TID]])
                 
     def sendRequest(self, method, args):
+        if self.stopped:
+            raise ProtocolError, "connection has been stopped"
         # make message
         # send it
         msg = {TID : chr(self.mtid), TYP : REQ,  REQ : method, ARG : args}
@@ -147,12 +161,12 @@ class KRPC:
         str = bencode(msg)
         d = Deferred()
         self.tids[msg[TID]] = d
-        def timeOut(tids = self.tids, id = msg[TID]):
+        def timeOut(tids = self.tids, id = msg[TID], msg = msg):
             if tids.has_key(id):
                 df = tids[id]
                 del(tids[id])
                 print ">>>>>> KRPC_ERROR_TIMEOUT"
-                df.errback(KRPC_ERROR_TIMEOUT)
+                df.errback(ProtocolError('timeout waiting for %r' % msg))
         later = reactor.callLater(KRPC_TIMEOUT, timeOut)
         def dropTimeOut(dict, later_call = later):
             if later_call.active():
@@ -161,6 +175,13 @@ class KRPC:
         d.addBoth(dropTimeOut)
         self.transport.write(str, self.addr)
         return d
+    
+    def stop(self):
+        """Timeout all pending requests."""
+        for df in self.tids.values():
+            df.errback(ProtocolError('connection has been closed'))
+        self.tids = {}
+        self.stopped = True
  
 def connectionForAddr(host, port):
     return host