From 136c82d6c138402e93bdefb499aed967ba059385 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Tue, 15 Apr 2008 15:57:03 -0700 Subject: [PATCH] Retransmit DHT requests before timeout occurs is complete. --- TODO | 41 ---------------- apt_p2p_Khashmir/DHT.py | 20 ++++---- apt_p2p_Khashmir/krpc.py | 101 +++++++++++++++++++++++++++++---------- 3 files changed, 87 insertions(+), 75 deletions(-) diff --git a/TODO b/TODO index 49a33d8..dd9524a 100644 --- a/TODO +++ b/TODO @@ -15,47 +15,6 @@ distributions. They need to be dealt with properly by adding them to the tracking done by the AptPackages module. -Retransmit DHT requests before timeout occurs. - -Currently, only a single transmission to a peer is ever attempted. If -that request is lost, a timeout will occur after 20 seconds, the peer -will be declared unreachable and the action will move on to the next -peer. Instead, try to resend the request periodically using exponential -backoff to make sure that lost packets don't delay the action so much. -For example, send the request, wait 2 seconds and send again, wait 4 -seconds and send again, wait 8 seconds (14 seconds have now passed) and -then declare the host unreachable. The same TID should be used in each -retransmission, so receiving multiple responses should not be a problem -as the extra ones will be ignored. - - -PeerManager needs to download large files from multiple peers. - -The PeerManager currently chooses a peer at random from the list of -possible peers, and downloads the entire file from there. This needs to -change if both a) the file is large (more than 512 KB), and b) there are -multiple peers with the file. The PeerManager should then break up the -large file into multiple pieces of size < 512 KB, and then send requests -to multiple peers for these pieces. - -This can cause a problem with hash checking the returned data, as hashes -for the pieces are not known. Any file that fails a hash check should be -downloaded again, with each piece being downloaded from different peers -than it was previously. The peers are shifted by 1, so that if a peers -previously downloaded piece i, it now downloads piece i+1, and the first -piece is downloaded by the previous downloader of the last piece, or -preferably a previously unused peer. As each piece is downloaded the -running hash of the file should be checked to determine the place at -which the file differs from the previous download. - -If the hash check then passes, then the peer who originally provided the -bad piece can be assessed blame for the error. Otherwise, the peer who -originally provided the piece is probably at fault, since he is now -providing a later piece. This doesn't work if the differing piece is the -first piece, in which case it is downloaded from a 3rd peer, with -consensus revealing the misbehaving peer. - - Consider storing deltas of packages. Instead of downloading full package files when a previous version of diff --git a/apt_p2p_Khashmir/DHT.py b/apt_p2p_Khashmir/DHT.py index f773482..400b10d 100644 --- a/apt_p2p_Khashmir/DHT.py +++ b/apt_p2p_Khashmir/DHT.py @@ -367,14 +367,15 @@ class TestSimpleDHT(unittest.TestCase): d = self.a.join() return d - def test_failed_join(self): + def no_krpc_errors(self, result): from krpc import KrpcError + self.flushLoggedErrors(KrpcError) + return result + + def test_failed_join(self): d = self.b.join() reactor.callLater(30, self.a.join) - def no_errors(result, self = self): - self.flushLoggedErrors(KrpcError) - return result - d.addCallback(no_errors) + d.addCallback(self.no_krpc_errors) return d def node_join(self, result): @@ -382,11 +383,14 @@ class TestSimpleDHT(unittest.TestCase): return d def test_join(self): - self.lastDefer = defer.Deferred() d = self.a.join() d.addCallback(self.node_join) - d.addCallback(self.lastDefer.callback) - return self.lastDefer + return d + + def test_timeout_retransmit(self): + d = self.b.join() + reactor.callLater(4, self.a.join) + return d def test_normKey(self): h = self.a._normKey('12345678901234567890') diff --git a/apt_p2p_Khashmir/krpc.py b/apt_p2p_Khashmir/krpc.py index 97b8908..92d03a7 100644 --- a/apt_p2p_Khashmir/krpc.py +++ b/apt_p2p_Khashmir/krpc.py @@ -3,6 +3,8 @@ """The KRPC communication protocol implementation. +@var KRPC_INITIAL_DELAY: the number of seconds after which to try resending + the request, the resends will wait twice as long each time @var KRPC_TIMEOUT: the number of seconds after which requests timeout @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a UDP packet without fragmentation @@ -38,17 +40,18 @@ """ from bencode import bencode, bdecode -from time import asctime +from datetime import datetime, timedelta from math import ceil -from twisted.internet.defer import Deferred +from twisted.internet import defer from twisted.internet import protocol, reactor from twisted.python import log from twisted.trial import unittest from khash import newID -KRPC_TIMEOUT = 20 +KRPC_INITIAL_DELAY = 2 +KRPC_TIMEOUT = 14 UDP_PACKET_LIMIT = 1472 # Remote node errors @@ -200,45 +203,79 @@ class hostbroker(protocol.DatagramProtocol): conn.stop() protocol.DatagramProtocol.stopProtocol(self) -class KrpcRequest(Deferred): +class KrpcRequest(defer.Deferred): """An outstanding request to a remote node. - + @type protocol: L{KRPC} + @ivar protocol: the protocol to send data with + @ivar tid: the transaction ID of the request + @type method: C{string} + @ivar method: the name of the method to call on the remote node + @type data: C{string} + @ivar data: the message to send to the remote node + @type delay: C{int} + @ivar delay: the last timeout delay sent + @type start: C{datetime} + @ivar start: the time to request was started at + @type later: L{twisted.internet.interfaces.IDelayedCall} + @ivar later: the pending call to timeout the last sent request """ - def __init__(self, protocol, newTID, method, data, initDelay): - Deferred.__init__(self) + def __init__(self, protocol, newTID, method, data): + """Initialize the request, and send it out. + + @type protocol: L{KRPC} + @param protocol: the protocol to send data with + @param newTID: the transaction ID of the request + @type method: C{string} + @param method: the name of the method to call on the remote node + @type data: C{string} + @param data: the message to send to the remote node + """ + defer.Deferred.__init__(self) self.protocol = protocol self.tid = newTID self.method = method self.data = data - self.delay = initDelay + self.delay = KRPC_INITIAL_DELAY + self.start = datetime.now() self.later = None + self.send() def send(self): + """Send the request to the remote node.""" assert not self.later, 'There is already a pending request' self.later = reactor.callLater(self.delay, self.timeOut) - self.delay *= 2 self.protocol.sendData(self.method, self.data) def timeOut(self): - """Call the deferred's errback if a timeout occurs.""" + """Check for a unrecoverable timeout, otherwise resend.""" self.later = None - self.protocol.timeOut(self.tid, self.method) + delay = datetime.now() - self.start + if delay > timedelta(seconds = KRPC_TIMEOUT): + log.msg('%r timed out after %0.2f sec' % + (self.tid, delay.seconds + delay.microseconds/1000000.0)) + self.protocol.timeOut(self.tid, self.method) + elif self.protocol.stopped: + log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid) + else: + self.delay *= 2 + log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay)) + self.send() def callback(self, resp): self.dropTimeOut() - Deferred.callback(self, resp) + defer.Deferred.callback(self, resp) def errback(self, resp): self.dropTimeOut() - Deferred.errback(self, resp) + defer.Deferred.errback(self, resp) def dropTimeOut(self): """Cancel the timeout call when a response is received.""" if self.later and self.later.active(): self.later.cancel() - self.later = None + self.later = None class KRPC: """The KRPC protocol implementation. @@ -364,9 +401,10 @@ class KRPC: msg[RSP]['_krpc_sender'] = addr req.callback(msg[RSP]) else: - # no tid, this transaction timed out already... + # no tid, this transaction was finished already... if self.noisy: - log.msg('timeout: %r' % msg[RSP]['id']) + log.msg('received response from %r for completed request: %r' % + (msg[RSP]['id'], msg[TID])) elif msg[TYP] == ERR: # Errors get processed by their TID's deferred's errback if self.tids.has_key(msg[TID]): @@ -375,9 +413,9 @@ class KRPC: # callback req.errback(KrpcError(*msg[ERR])) else: - # day late and dollar short, just log it - log.msg("Got an error for an unknown request: %r" % (msg[ERR], )) - pass + # no tid, this transaction was finished already... + log.msg('received an error %r from %r for completed request: %r' % + (msg[ERR], msg[RSP]['id'], msg[TID])) else: # Received an unknown message type if self.noisy: @@ -469,7 +507,8 @@ class KRPC: @param args: the arguments to send to the remote node's method """ if self.stopped: - raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped") + return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, + "cannot send, connection has been stopped")) # Create the request message newTID = newID() @@ -478,26 +517,36 @@ class KRPC: log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg)) data = bencode(msg) - # Create the deferred and save it with the TID - req = KrpcRequest(self, newTID, method, data, 10) + # Create the request object and save it with the TID + req = KrpcRequest(self, newTID, method, data) self.tids[newTID] = req # Save the conclusion of the action req.addCallbacks(self.stats.responseAction, self.stats.failedAction, callbackArgs = (method, ), errbackArgs = (method, )) - req.send() return req def sendData(self, method, data): - # Save some stats + """Write a request to the transport and save the stats. + + @type method: C{string} + @param method: the name of the method to call on the remote node + @type data: C{string} + @param data: the message to send to the remote node + """ self.stats.sentAction(method) self.stats.sentBytes(len(data)) self.transport.write(data, self.addr) def timeOut(self, badTID, method): - """Call the deferred's errback if a timeout occurs.""" + """Call the deferred's errback if a timeout occurs. + + @param badTID: the transaction ID of the request + @type method: C{string} + @param method: the name of the method that timed out on the remote node + """ if badTID in self.tids: req = self.tids[badTID] del(self.tids[badTID]) @@ -507,7 +556,7 @@ class KRPC: log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr)) def stop(self): - """Timeout all pending requests.""" + """Cancel all pending requests.""" for req in self.tids.values(): req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, 'connection has been stopped while waiting for response')) -- 2.39.5