Retransmit DHT requests before timeout occurs is complete.
authorCameron Dale <camrdale@gmail.com>
Tue, 15 Apr 2008 22:57:03 +0000 (15:57 -0700)
committerCameron Dale <camrdale@gmail.com>
Tue, 15 Apr 2008 22:57:03 +0000 (15:57 -0700)
TODO
apt_p2p_Khashmir/DHT.py
apt_p2p_Khashmir/krpc.py

diff --git a/TODO b/TODO
index 49a33d8fc8227cc202a32a6e072f68ea6be12283..dd9524a40f3ce417eba51814d8430bb5d34372b1 100644 (file)
--- a/TODO
+++ b/TODO
@@ -15,47 +15,6 @@ distributions. They need to be dealt with properly by
 adding them to the tracking done by the AptPackages module.
 
 
-Retransmit DHT requests before timeout occurs.
-
-Currently, only a single transmission to a peer is ever attempted. If
-that request is lost, a timeout will occur after 20 seconds, the peer
-will be declared unreachable and the action will move on to the next
-peer. Instead, try to resend the request periodically using exponential
-backoff to make sure that lost packets don't delay the action so much.
-For example, send the request, wait 2 seconds and send again, wait 4
-seconds and send again, wait 8 seconds (14 seconds have now passed) and
-then declare the host unreachable. The same TID should be used in each
-retransmission, so receiving multiple responses should not be a problem
-as the extra ones will be ignored. 
-
-
-PeerManager needs to download large files from multiple peers.
-
-The PeerManager currently chooses a peer at random from the list of 
-possible peers, and downloads the entire file from there. This needs to 
-change if both a) the file is large (more than 512 KB), and b) there are
-multiple peers with the file. The PeerManager should then break up the 
-large file into multiple pieces of size < 512 KB, and then send requests 
-to multiple peers for these pieces.
-
-This can cause a problem with hash checking the returned data, as hashes 
-for the pieces are not known. Any file that fails a hash check should be 
-downloaded again, with each piece being downloaded from different peers 
-than it was previously. The peers are shifted by 1, so that if a peers 
-previously downloaded piece i, it now downloads piece i+1, and the first 
-piece is downloaded by the previous downloader of the last piece, or 
-preferably a previously unused peer. As each piece is downloaded the 
-running hash of the file should be checked to determine the place at 
-which the file differs from the previous download.
-
-If the hash check then passes, then the peer who originally provided the 
-bad piece can be assessed blame for the error. Otherwise, the peer who 
-originally provided the piece is probably at fault, since he is now 
-providing a later piece. This doesn't work if the differing piece is the 
-first piece, in which case it is downloaded from a 3rd peer, with 
-consensus revealing the misbehaving peer.
-
-
 Consider storing deltas of packages.
 
 Instead of downloading full package files when a previous version of
index f773482695428b03e044153e3b2e2b3e8a6be8e1..400b10de1d7098d692e80805c73dba9e8959cb7c 100644 (file)
@@ -367,14 +367,15 @@ class TestSimpleDHT(unittest.TestCase):
         d = self.a.join()
         return d
 
-    def test_failed_join(self):
+    def no_krpc_errors(self, result):
         from krpc import KrpcError
+        self.flushLoggedErrors(KrpcError)
+        return result
+
+    def test_failed_join(self):
         d = self.b.join()
         reactor.callLater(30, self.a.join)
-        def no_errors(result, self = self):
-            self.flushLoggedErrors(KrpcError)
-            return result
-        d.addCallback(no_errors)
+        d.addCallback(self.no_krpc_errors)
         return d
         
     def node_join(self, result):
@@ -382,11 +383,14 @@ class TestSimpleDHT(unittest.TestCase):
         return d
     
     def test_join(self):
-        self.lastDefer = defer.Deferred()
         d = self.a.join()
         d.addCallback(self.node_join)
-        d.addCallback(self.lastDefer.callback)
-        return self.lastDefer
+        return d
+
+    def test_timeout_retransmit(self):
+        d = self.b.join()
+        reactor.callLater(4, self.a.join)
+        return d
 
     def test_normKey(self):
         h = self.a._normKey('12345678901234567890')
index 97b8908c2677bdad85653d3f42892427b3e0bcc5..92d03a727a2211b586efc9e49009ef657806aefb 100644 (file)
@@ -3,6 +3,8 @@
 
 """The KRPC communication protocol implementation.
 
+@var KRPC_INITIAL_DELAY: the number of seconds after which to try resending
+    the request, the resends will wait twice as long each time
 @var KRPC_TIMEOUT: the number of seconds after which requests timeout
 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
     UDP packet without fragmentation
 """
 
 from bencode import bencode, bdecode
-from time import asctime
+from datetime import datetime, timedelta
 from math import ceil
 
-from twisted.internet.defer import Deferred
+from twisted.internet import defer
 from twisted.internet import protocol, reactor
 from twisted.python import log
 from twisted.trial import unittest
 
 from khash import newID
 
-KRPC_TIMEOUT = 20
+KRPC_INITIAL_DELAY = 2
+KRPC_TIMEOUT = 14
 UDP_PACKET_LIMIT = 1472
 
 # Remote node errors
@@ -200,45 +203,79 @@ class hostbroker(protocol.DatagramProtocol):
             conn.stop()
         protocol.DatagramProtocol.stopProtocol(self)
 
-class KrpcRequest(Deferred):
+class KrpcRequest(defer.Deferred):
     """An outstanding request to a remote node.
     
-    
+    @type protocol: L{KRPC}
+    @ivar protocol: the protocol to send data with
+    @ivar tid: the transaction ID of the request
+    @type method: C{string}
+    @ivar method: the name of the method to call on the remote node
+    @type data: C{string}
+    @ivar data: the message to send to the remote node
+    @type delay: C{int}
+    @ivar delay: the last timeout delay sent
+    @type start: C{datetime}
+    @ivar start: the time to request was started at
+    @type later: L{twisted.internet.interfaces.IDelayedCall}
+    @ivar later: the pending call to timeout the last sent request
     """
     
-    def __init__(self, protocol, newTID, method, data, initDelay):
-        Deferred.__init__(self)
+    def __init__(self, protocol, newTID, method, data):
+        """Initialize the request, and send it out.
+        
+        @type protocol: L{KRPC}
+        @param protocol: the protocol to send data with
+        @param newTID: the transaction ID of the request
+        @type method: C{string}
+        @param method: the name of the method to call on the remote node
+        @type data: C{string}
+        @param data: the message to send to the remote node
+        """
+        defer.Deferred.__init__(self)
         self.protocol = protocol
         self.tid = newTID
         self.method = method
         self.data = data
-        self.delay = initDelay
+        self.delay = KRPC_INITIAL_DELAY
+        self.start = datetime.now()
         self.later = None
+        self.send()
         
     def send(self):
+        """Send the request to the remote node."""
         assert not self.later, 'There is already a pending request'
         self.later = reactor.callLater(self.delay, self.timeOut)
-        self.delay *= 2
         self.protocol.sendData(self.method, self.data)
 
     def timeOut(self):
-        """Call the deferred's errback if a timeout occurs."""
+        """Check for a unrecoverable timeout, otherwise resend."""
         self.later = None
-        self.protocol.timeOut(self.tid, self.method)
+        delay = datetime.now() - self.start
+        if delay > timedelta(seconds = KRPC_TIMEOUT):
+            log.msg('%r timed out after %0.2f sec' %
+                    (self.tid, delay.seconds + delay.microseconds/1000000.0))
+            self.protocol.timeOut(self.tid, self.method)
+        elif self.protocol.stopped:
+            log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
+        else:
+            self.delay *= 2
+            log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
+            self.send()
         
     def callback(self, resp):
         self.dropTimeOut()
-        Deferred.callback(self, resp)
+        defer.Deferred.callback(self, resp)
         
     def errback(self, resp):
         self.dropTimeOut()
-        Deferred.errback(self, resp)
+        defer.Deferred.errback(self, resp)
         
     def dropTimeOut(self):
         """Cancel the timeout call when a response is received."""
         if self.later and self.later.active():
             self.later.cancel()
-            self.later = None
+        self.later = None
 
 class KRPC:
     """The KRPC protocol implementation.
@@ -364,9 +401,10 @@ class KRPC:
                 msg[RSP]['_krpc_sender'] = addr
                 req.callback(msg[RSP])
             else:
-                # no tid, this transaction timed out already...
+                # no tid, this transaction was finished already...
                 if self.noisy:
-                    log.msg('timeout: %r' % msg[RSP]['id'])
+                    log.msg('received response from %r for completed request: %r' %
+                            (msg[RSP]['id'], msg[TID]))
         elif msg[TYP] == ERR:
             # Errors get processed by their TID's deferred's errback
             if self.tids.has_key(msg[TID]):
@@ -375,9 +413,9 @@ class KRPC:
                 # callback
                 req.errback(KrpcError(*msg[ERR]))
             else:
-                # day late and dollar short, just log it
-                log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
-                pass
+                # no tid, this transaction was finished already...
+                log.msg('received an error %r from %r for completed request: %r' %
+                        (msg[ERR], msg[RSP]['id'], msg[TID]))
         else:
             # Received an unknown message type
             if self.noisy:
@@ -469,7 +507,8 @@ class KRPC:
         @param args: the arguments to send to the remote node's method
         """
         if self.stopped:
-            raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
+            return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+                                        "cannot send, connection has been stopped"))
 
         # Create the request message
         newTID = newID()
@@ -478,26 +517,36 @@ class KRPC:
             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
         data = bencode(msg)
         
-        # Create the deferred and save it with the TID
-        req = KrpcRequest(self, newTID, method, data, 10)
+        # Create the request object and save it with the TID
+        req = KrpcRequest(self, newTID, method, data)
         self.tids[newTID] = req
         
         # Save the conclusion of the action
         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
                          callbackArgs = (method, ), errbackArgs = (method, ))
 
-        req.send()
         return req
     
     def sendData(self, method, data):
-        # Save some stats
+        """Write a request to the transport and save the stats.
+        
+        @type method: C{string}
+        @param method: the name of the method to call on the remote node
+        @type data: C{string}
+        @param data: the message to send to the remote node
+        """
         self.stats.sentAction(method)
         self.stats.sentBytes(len(data))
         
         self.transport.write(data, self.addr)
         
     def timeOut(self, badTID, method):
-        """Call the deferred's errback if a timeout occurs."""
+        """Call the deferred's errback if a timeout occurs.
+        
+        @param badTID: the transaction ID of the request
+        @type method: C{string}
+        @param method: the name of the method that timed out on the remote node
+        """
         if badTID in self.tids:
             req = self.tids[badTID]
             del(self.tids[badTID])
@@ -507,7 +556,7 @@ class KRPC:
             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
         
     def stop(self):
-        """Timeout all pending requests."""
+        """Cancel all pending requests."""
         for req in self.tids.values():
             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
                                   'connection has been stopped while waiting for response'))