WIP on sending multiple KRPC requests before timeout.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
index 47e7452f907e42baa755226e0b94db0abf7bd274..d4101ad40b17350fa33c5759c1508e473385972a 100644 (file)
@@ -200,6 +200,43 @@ class hostbroker(protocol.DatagramProtocol):
             conn.stop()
         protocol.DatagramProtocol.stopProtocol(self)
 
+class KrpcRequest(Deferred):
+    """An outstanding request to a remote node.
+    
+    
+    """
+    
+    def __init__(self, protocol, newTID, method, data, initDelay):
+        Deferred.__init__(self)
+        self.protocol = protocol
+        self.tid = newTID
+        self.method = method
+        self.data = data
+        self.delay = initDelay
+        self.later = None
+        
+    def send(self):
+        self.later = reactor.callLater(self.delay, self.timeOut)
+        self.delay *= 2
+        self.protocol._sendData(self.method, self.data)
+
+    def timeOut(self):
+        """Call the deferred's errback if a timeout occurs."""
+        self.protocol._timeOut(self.tid, self.method)
+        
+    def callback(self, resp):
+        self.dropTimeOut()
+        Deferred.callback(self, resp)
+        
+    def errback(self, resp):
+        self.dropTimeOut()
+        Deferred.errback(self, resp)
+        
+    def dropTimeOut(self):
+        """Cancel the timeout call when a response is received."""
+        if self.later and self.later.active():
+            self.later.cancel()
+
 class KRPC:
     """The KRPC protocol implementation.
     
@@ -318,11 +355,11 @@ class KRPC:
         elif msg[TYP] == RSP:
             # Responses get processed by their TID's deferred
             if self.tids.has_key(msg[TID]):
-                df = self.tids[msg[TID]]
+                req = self.tids[msg[TID]]
                 #      callback
                 del(self.tids[msg[TID]])
                 msg[RSP]['_krpc_sender'] = addr
-                df.callback(msg[RSP])
+                req.callback(msg[RSP])
             else:
                 # no tid, this transaction timed out already...
                 if self.noisy:
@@ -330,10 +367,10 @@ class KRPC:
         elif msg[TYP] == ERR:
             # Errors get processed by their TID's deferred's errback
             if self.tids.has_key(msg[TID]):
-                df = self.tids[msg[TID]]
+                req = self.tids[msg[TID]]
                 del(self.tids[msg[TID]])
                 # callback
-                df.errback(KrpcError(*msg[ERR]))
+                req.errback(KrpcError(*msg[ERR]))
             else:
                 # day late and dollar short, just log it
                 log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
@@ -343,11 +380,11 @@ class KRPC:
             if self.noisy:
                 log.msg("unknown message type: %r" % msg)
             if msg[TID] in self.tids:
-                df = self.tids[msg[TID]]
+                req = self.tids[msg[TID]]
                 del(self.tids[msg[TID]])
                 # callback
-                df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
-                                     "Received an unknown message type: %r" % msg[TYP]))
+                req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
+                                      "Received an unknown message type: %r" % msg[TYP]))
                 
     def _sendResponse(self, request, addr, tid, msgType, response):
         """Helper function for sending responses to nodes.
@@ -425,55 +462,50 @@ class KRPC:
         """Send a request to the remote node.
         
         @type method: C{string}
-        @param method: the methiod name to call on the remote node
+        @param method: the method name to call on the remote node
         @param args: the arguments to send to the remote node's method
         """
         if self.stopped:
             raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
 
         # Create the request message
-        msg = {TID : newID(), TYP : REQ,  REQ : method, ARG : args}
+        newTID = newID()
+        msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
         if self.noisy:
             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
         data = bencode(msg)
         
         # Create the deferred and save it with the TID
-        d = Deferred()
-        self.tids[msg[TID]] = d
+        req = KrpcRequest(self, newTID, method, data, 10)
+        self.tids[newTID] = req
         
         # Save the conclusion of the action
-        d.addCallbacks(self.stats.responseAction, self.stats.failedAction,
-                       callbackArgs = (method, ), errbackArgs = (method, ))
-
-        # Schedule a later timeout call
-        def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr):
-            """Call the deferred's errback if a timeout occurs."""
-            if tids.has_key(id):
-                df = tids[id]
-                del(tids[id])
-                df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr)))
-        later = reactor.callLater(KRPC_TIMEOUT, timeOut)
-        
-        # Cancel the timeout call if a response is received
-        def dropTimeOut(dict, later_call = later):
-            """Cancel the timeout call when a response is received."""
-            if later_call.active():
-                later_call.cancel()
-            return dict
-        d.addBoth(dropTimeOut)
+        req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
+                         callbackArgs = (method, ), errbackArgs = (method, ))
 
+        req.send()
+        return req
+    
+    def _sendData(self, method, data):
         # Save some stats
         self.stats.sentAction(method)
         self.stats.sentBytes(len(data))
         
         self.transport.write(data, self.addr)
-        return d
-    
+        
+    def _timeOut(self, badTID, method):
+        """Call the deferred's errback if a timeout occurs."""
+        if badTID in self.tids:
+            req = self.tids[badTID]
+            del(self.tids[badTID])
+            self.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % 
+                                                        (method, self.addr)))
+        
     def stop(self):
         """Timeout all pending requests."""
-        for df in self.tids.values():
-            df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
-                                 'connection has been stopped while waiting for response'))
+        for req in self.tids.values():
+            req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+                                  'connection has been stopped while waiting for response'))
         self.tids = {}
         self.stopped = True