]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
Retransmit DHT requests before timeout occurs is complete.
authorCameron Dale <camrdale@gmail.com>
Tue, 15 Apr 2008 22:57:03 +0000 (15:57 -0700)
committerCameron Dale <camrdale@gmail.com>
Tue, 15 Apr 2008 22:57:03 +0000 (15:57 -0700)
TODO
apt_p2p_Khashmir/DHT.py
apt_p2p_Khashmir/krpc.py

diff --git a/TODO b/TODO
index 49a33d8fc8227cc202a32a6e072f68ea6be12283..dd9524a40f3ce417eba51814d8430bb5d34372b1 100644 (file)
--- a/TODO
+++ b/TODO
@@ -15,47 +15,6 @@ distributions. They need to be dealt with properly by
 adding them to the tracking done by the AptPackages module.
 
 
 adding them to the tracking done by the AptPackages module.
 
 
-Retransmit DHT requests before timeout occurs.
-
-Currently, only a single transmission to a peer is ever attempted. If
-that request is lost, a timeout will occur after 20 seconds, the peer
-will be declared unreachable and the action will move on to the next
-peer. Instead, try to resend the request periodically using exponential
-backoff to make sure that lost packets don't delay the action so much.
-For example, send the request, wait 2 seconds and send again, wait 4
-seconds and send again, wait 8 seconds (14 seconds have now passed) and
-then declare the host unreachable. The same TID should be used in each
-retransmission, so receiving multiple responses should not be a problem
-as the extra ones will be ignored. 
-
-
-PeerManager needs to download large files from multiple peers.
-
-The PeerManager currently chooses a peer at random from the list of 
-possible peers, and downloads the entire file from there. This needs to 
-change if both a) the file is large (more than 512 KB), and b) there are
-multiple peers with the file. The PeerManager should then break up the 
-large file into multiple pieces of size < 512 KB, and then send requests 
-to multiple peers for these pieces.
-
-This can cause a problem with hash checking the returned data, as hashes 
-for the pieces are not known. Any file that fails a hash check should be 
-downloaded again, with each piece being downloaded from different peers 
-than it was previously. The peers are shifted by 1, so that if a peers 
-previously downloaded piece i, it now downloads piece i+1, and the first 
-piece is downloaded by the previous downloader of the last piece, or 
-preferably a previously unused peer. As each piece is downloaded the 
-running hash of the file should be checked to determine the place at 
-which the file differs from the previous download.
-
-If the hash check then passes, then the peer who originally provided the 
-bad piece can be assessed blame for the error. Otherwise, the peer who 
-originally provided the piece is probably at fault, since he is now 
-providing a later piece. This doesn't work if the differing piece is the 
-first piece, in which case it is downloaded from a 3rd peer, with 
-consensus revealing the misbehaving peer.
-
-
 Consider storing deltas of packages.
 
 Instead of downloading full package files when a previous version of
 Consider storing deltas of packages.
 
 Instead of downloading full package files when a previous version of
index f773482695428b03e044153e3b2e2b3e8a6be8e1..400b10de1d7098d692e80805c73dba9e8959cb7c 100644 (file)
@@ -367,14 +367,15 @@ class TestSimpleDHT(unittest.TestCase):
         d = self.a.join()
         return d
 
         d = self.a.join()
         return d
 
-    def test_failed_join(self):
+    def no_krpc_errors(self, result):
         from krpc import KrpcError
         from krpc import KrpcError
+        self.flushLoggedErrors(KrpcError)
+        return result
+
+    def test_failed_join(self):
         d = self.b.join()
         reactor.callLater(30, self.a.join)
         d = self.b.join()
         reactor.callLater(30, self.a.join)
-        def no_errors(result, self = self):
-            self.flushLoggedErrors(KrpcError)
-            return result
-        d.addCallback(no_errors)
+        d.addCallback(self.no_krpc_errors)
         return d
         
     def node_join(self, result):
         return d
         
     def node_join(self, result):
@@ -382,11 +383,14 @@ class TestSimpleDHT(unittest.TestCase):
         return d
     
     def test_join(self):
         return d
     
     def test_join(self):
-        self.lastDefer = defer.Deferred()
         d = self.a.join()
         d.addCallback(self.node_join)
         d = self.a.join()
         d.addCallback(self.node_join)
-        d.addCallback(self.lastDefer.callback)
-        return self.lastDefer
+        return d
+
+    def test_timeout_retransmit(self):
+        d = self.b.join()
+        reactor.callLater(4, self.a.join)
+        return d
 
     def test_normKey(self):
         h = self.a._normKey('12345678901234567890')
 
     def test_normKey(self):
         h = self.a._normKey('12345678901234567890')
index 97b8908c2677bdad85653d3f42892427b3e0bcc5..92d03a727a2211b586efc9e49009ef657806aefb 100644 (file)
@@ -3,6 +3,8 @@
 
 """The KRPC communication protocol implementation.
 
 
 """The KRPC communication protocol implementation.
 
+@var KRPC_INITIAL_DELAY: the number of seconds after which to try resending
+    the request, the resends will wait twice as long each time
 @var KRPC_TIMEOUT: the number of seconds after which requests timeout
 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
     UDP packet without fragmentation
 @var KRPC_TIMEOUT: the number of seconds after which requests timeout
 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
     UDP packet without fragmentation
 """
 
 from bencode import bencode, bdecode
 """
 
 from bencode import bencode, bdecode
-from time import asctime
+from datetime import datetime, timedelta
 from math import ceil
 
 from math import ceil
 
-from twisted.internet.defer import Deferred
+from twisted.internet import defer
 from twisted.internet import protocol, reactor
 from twisted.python import log
 from twisted.trial import unittest
 
 from khash import newID
 
 from twisted.internet import protocol, reactor
 from twisted.python import log
 from twisted.trial import unittest
 
 from khash import newID
 
-KRPC_TIMEOUT = 20
+KRPC_INITIAL_DELAY = 2
+KRPC_TIMEOUT = 14
 UDP_PACKET_LIMIT = 1472
 
 # Remote node errors
 UDP_PACKET_LIMIT = 1472
 
 # Remote node errors
@@ -200,45 +203,79 @@ class hostbroker(protocol.DatagramProtocol):
             conn.stop()
         protocol.DatagramProtocol.stopProtocol(self)
 
             conn.stop()
         protocol.DatagramProtocol.stopProtocol(self)
 
-class KrpcRequest(Deferred):
+class KrpcRequest(defer.Deferred):
     """An outstanding request to a remote node.
     
     """An outstanding request to a remote node.
     
-    
+    @type protocol: L{KRPC}
+    @ivar protocol: the protocol to send data with
+    @ivar tid: the transaction ID of the request
+    @type method: C{string}
+    @ivar method: the name of the method to call on the remote node
+    @type data: C{string}
+    @ivar data: the message to send to the remote node
+    @type delay: C{int}
+    @ivar delay: the last timeout delay sent
+    @type start: C{datetime}
+    @ivar start: the time to request was started at
+    @type later: L{twisted.internet.interfaces.IDelayedCall}
+    @ivar later: the pending call to timeout the last sent request
     """
     
     """
     
-    def __init__(self, protocol, newTID, method, data, initDelay):
-        Deferred.__init__(self)
+    def __init__(self, protocol, newTID, method, data):
+        """Initialize the request, and send it out.
+        
+        @type protocol: L{KRPC}
+        @param protocol: the protocol to send data with
+        @param newTID: the transaction ID of the request
+        @type method: C{string}
+        @param method: the name of the method to call on the remote node
+        @type data: C{string}
+        @param data: the message to send to the remote node
+        """
+        defer.Deferred.__init__(self)
         self.protocol = protocol
         self.tid = newTID
         self.method = method
         self.data = data
         self.protocol = protocol
         self.tid = newTID
         self.method = method
         self.data = data
-        self.delay = initDelay
+        self.delay = KRPC_INITIAL_DELAY
+        self.start = datetime.now()
         self.later = None
         self.later = None
+        self.send()
         
     def send(self):
         
     def send(self):
+        """Send the request to the remote node."""
         assert not self.later, 'There is already a pending request'
         self.later = reactor.callLater(self.delay, self.timeOut)
         assert not self.later, 'There is already a pending request'
         self.later = reactor.callLater(self.delay, self.timeOut)
-        self.delay *= 2
         self.protocol.sendData(self.method, self.data)
 
     def timeOut(self):
         self.protocol.sendData(self.method, self.data)
 
     def timeOut(self):
-        """Call the deferred's errback if a timeout occurs."""
+        """Check for a unrecoverable timeout, otherwise resend."""
         self.later = None
         self.later = None
-        self.protocol.timeOut(self.tid, self.method)
+        delay = datetime.now() - self.start
+        if delay > timedelta(seconds = KRPC_TIMEOUT):
+            log.msg('%r timed out after %0.2f sec' %
+                    (self.tid, delay.seconds + delay.microseconds/1000000.0))
+            self.protocol.timeOut(self.tid, self.method)
+        elif self.protocol.stopped:
+            log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
+        else:
+            self.delay *= 2
+            log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
+            self.send()
         
     def callback(self, resp):
         self.dropTimeOut()
         
     def callback(self, resp):
         self.dropTimeOut()
-        Deferred.callback(self, resp)
+        defer.Deferred.callback(self, resp)
         
     def errback(self, resp):
         self.dropTimeOut()
         
     def errback(self, resp):
         self.dropTimeOut()
-        Deferred.errback(self, resp)
+        defer.Deferred.errback(self, resp)
         
     def dropTimeOut(self):
         """Cancel the timeout call when a response is received."""
         if self.later and self.later.active():
             self.later.cancel()
         
     def dropTimeOut(self):
         """Cancel the timeout call when a response is received."""
         if self.later and self.later.active():
             self.later.cancel()
-            self.later = None
+        self.later = None
 
 class KRPC:
     """The KRPC protocol implementation.
 
 class KRPC:
     """The KRPC protocol implementation.
@@ -364,9 +401,10 @@ class KRPC:
                 msg[RSP]['_krpc_sender'] = addr
                 req.callback(msg[RSP])
             else:
                 msg[RSP]['_krpc_sender'] = addr
                 req.callback(msg[RSP])
             else:
-                # no tid, this transaction timed out already...
+                # no tid, this transaction was finished already...
                 if self.noisy:
                 if self.noisy:
-                    log.msg('timeout: %r' % msg[RSP]['id'])
+                    log.msg('received response from %r for completed request: %r' %
+                            (msg[RSP]['id'], msg[TID]))
         elif msg[TYP] == ERR:
             # Errors get processed by their TID's deferred's errback
             if self.tids.has_key(msg[TID]):
         elif msg[TYP] == ERR:
             # Errors get processed by their TID's deferred's errback
             if self.tids.has_key(msg[TID]):
@@ -375,9 +413,9 @@ class KRPC:
                 # callback
                 req.errback(KrpcError(*msg[ERR]))
             else:
                 # callback
                 req.errback(KrpcError(*msg[ERR]))
             else:
-                # day late and dollar short, just log it
-                log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
-                pass
+                # no tid, this transaction was finished already...
+                log.msg('received an error %r from %r for completed request: %r' %
+                        (msg[ERR], msg[RSP]['id'], msg[TID]))
         else:
             # Received an unknown message type
             if self.noisy:
         else:
             # Received an unknown message type
             if self.noisy:
@@ -469,7 +507,8 @@ class KRPC:
         @param args: the arguments to send to the remote node's method
         """
         if self.stopped:
         @param args: the arguments to send to the remote node's method
         """
         if self.stopped:
-            raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
+            return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+                                        "cannot send, connection has been stopped"))
 
         # Create the request message
         newTID = newID()
 
         # Create the request message
         newTID = newID()
@@ -478,26 +517,36 @@ class KRPC:
             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
         data = bencode(msg)
         
             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
         data = bencode(msg)
         
-        # Create the deferred and save it with the TID
-        req = KrpcRequest(self, newTID, method, data, 10)
+        # Create the request object and save it with the TID
+        req = KrpcRequest(self, newTID, method, data)
         self.tids[newTID] = req
         
         # Save the conclusion of the action
         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
                          callbackArgs = (method, ), errbackArgs = (method, ))
 
         self.tids[newTID] = req
         
         # Save the conclusion of the action
         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
                          callbackArgs = (method, ), errbackArgs = (method, ))
 
-        req.send()
         return req
     
     def sendData(self, method, data):
         return req
     
     def sendData(self, method, data):
-        # Save some stats
+        """Write a request to the transport and save the stats.
+        
+        @type method: C{string}
+        @param method: the name of the method to call on the remote node
+        @type data: C{string}
+        @param data: the message to send to the remote node
+        """
         self.stats.sentAction(method)
         self.stats.sentBytes(len(data))
         
         self.transport.write(data, self.addr)
         
     def timeOut(self, badTID, method):
         self.stats.sentAction(method)
         self.stats.sentBytes(len(data))
         
         self.transport.write(data, self.addr)
         
     def timeOut(self, badTID, method):
-        """Call the deferred's errback if a timeout occurs."""
+        """Call the deferred's errback if a timeout occurs.
+        
+        @param badTID: the transaction ID of the request
+        @type method: C{string}
+        @param method: the name of the method that timed out on the remote node
+        """
         if badTID in self.tids:
             req = self.tids[badTID]
             del(self.tids[badTID])
         if badTID in self.tids:
             req = self.tids[badTID]
             del(self.tids[badTID])
@@ -507,7 +556,7 @@ class KRPC:
             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
         
     def stop(self):
             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
         
     def stop(self):
-        """Timeout all pending requests."""
+        """Cancel all pending requests."""
         for req in self.tids.values():
             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
                                   'connection has been stopped while waiting for response'))
         for req in self.tids.values():
             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
                                   'connection has been stopped while waiting for response'))