adding them to the tracking done by the AptPackages module.
-Retransmit DHT requests before timeout occurs.
-
-Currently, only a single transmission to a peer is ever attempted. If
-that request is lost, a timeout will occur after 20 seconds, the peer
-will be declared unreachable and the action will move on to the next
-peer. Instead, try to resend the request periodically using exponential
-backoff to make sure that lost packets don't delay the action so much.
-For example, send the request, wait 2 seconds and send again, wait 4
-seconds and send again, wait 8 seconds (14 seconds have now passed) and
-then declare the host unreachable. The same TID should be used in each
-retransmission, so receiving multiple responses should not be a problem
-as the extra ones will be ignored.
-
-
-PeerManager needs to download large files from multiple peers.
-
-The PeerManager currently chooses a peer at random from the list of
-possible peers, and downloads the entire file from there. This needs to
-change if both a) the file is large (more than 512 KB), and b) there are
-multiple peers with the file. The PeerManager should then break up the
-large file into multiple pieces of size < 512 KB, and then send requests
-to multiple peers for these pieces.
-
-This can cause a problem with hash checking the returned data, as hashes
-for the pieces are not known. Any file that fails a hash check should be
-downloaded again, with each piece being downloaded from different peers
-than it was previously. The peers are shifted by 1, so that if a peers
-previously downloaded piece i, it now downloads piece i+1, and the first
-piece is downloaded by the previous downloader of the last piece, or
-preferably a previously unused peer. As each piece is downloaded the
-running hash of the file should be checked to determine the place at
-which the file differs from the previous download.
-
-If the hash check then passes, then the peer who originally provided the
-bad piece can be assessed blame for the error. Otherwise, the peer who
-originally provided the piece is probably at fault, since he is now
-providing a later piece. This doesn't work if the differing piece is the
-first piece, in which case it is downloaded from a 3rd peer, with
-consensus revealing the misbehaving peer.
-
-
Consider storing deltas of packages.
Instead of downloading full package files when a previous version of
"""The KRPC communication protocol implementation.
+@var KRPC_INITIAL_DELAY: the number of seconds after which to try resending
+ the request, the resends will wait twice as long each time
@var KRPC_TIMEOUT: the number of seconds after which requests timeout
@var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
UDP packet without fragmentation
"""
from bencode import bencode, bdecode
-from time import asctime
+from datetime import datetime, timedelta
from math import ceil
-from twisted.internet.defer import Deferred
+from twisted.internet import defer
from twisted.internet import protocol, reactor
from twisted.python import log
from twisted.trial import unittest
from khash import newID
-KRPC_TIMEOUT = 20
+KRPC_INITIAL_DELAY = 2
+KRPC_TIMEOUT = 14
UDP_PACKET_LIMIT = 1472
# Remote node errors
conn.stop()
protocol.DatagramProtocol.stopProtocol(self)
-class KrpcRequest(Deferred):
+class KrpcRequest(defer.Deferred):
"""An outstanding request to a remote node.
-
+ @type protocol: L{KRPC}
+ @ivar protocol: the protocol to send data with
+ @ivar tid: the transaction ID of the request
+ @type method: C{string}
+ @ivar method: the name of the method to call on the remote node
+ @type data: C{string}
+ @ivar data: the message to send to the remote node
+ @type delay: C{int}
+ @ivar delay: the last timeout delay sent
+ @type start: C{datetime}
+ @ivar start: the time to request was started at
+ @type later: L{twisted.internet.interfaces.IDelayedCall}
+ @ivar later: the pending call to timeout the last sent request
"""
- def __init__(self, protocol, newTID, method, data, initDelay):
- Deferred.__init__(self)
+ def __init__(self, protocol, newTID, method, data):
+ """Initialize the request, and send it out.
+
+ @type protocol: L{KRPC}
+ @param protocol: the protocol to send data with
+ @param newTID: the transaction ID of the request
+ @type method: C{string}
+ @param method: the name of the method to call on the remote node
+ @type data: C{string}
+ @param data: the message to send to the remote node
+ """
+ defer.Deferred.__init__(self)
self.protocol = protocol
self.tid = newTID
self.method = method
self.data = data
- self.delay = initDelay
+ self.delay = KRPC_INITIAL_DELAY
+ self.start = datetime.now()
self.later = None
+ self.send()
def send(self):
+ """Send the request to the remote node."""
assert not self.later, 'There is already a pending request'
self.later = reactor.callLater(self.delay, self.timeOut)
- self.delay *= 2
self.protocol.sendData(self.method, self.data)
def timeOut(self):
- """Call the deferred's errback if a timeout occurs."""
+ """Check for a unrecoverable timeout, otherwise resend."""
self.later = None
- self.protocol.timeOut(self.tid, self.method)
+ delay = datetime.now() - self.start
+ if delay > timedelta(seconds = KRPC_TIMEOUT):
+ log.msg('%r timed out after %0.2f sec' %
+ (self.tid, delay.seconds + delay.microseconds/1000000.0))
+ self.protocol.timeOut(self.tid, self.method)
+ elif self.protocol.stopped:
+ log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
+ else:
+ self.delay *= 2
+ log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
+ self.send()
def callback(self, resp):
self.dropTimeOut()
- Deferred.callback(self, resp)
+ defer.Deferred.callback(self, resp)
def errback(self, resp):
self.dropTimeOut()
- Deferred.errback(self, resp)
+ defer.Deferred.errback(self, resp)
def dropTimeOut(self):
"""Cancel the timeout call when a response is received."""
if self.later and self.later.active():
self.later.cancel()
- self.later = None
+ self.later = None
class KRPC:
"""The KRPC protocol implementation.
msg[RSP]['_krpc_sender'] = addr
req.callback(msg[RSP])
else:
- # no tid, this transaction timed out already...
+ # no tid, this transaction was finished already...
if self.noisy:
- log.msg('timeout: %r' % msg[RSP]['id'])
+ log.msg('received response from %r for completed request: %r' %
+ (msg[RSP]['id'], msg[TID]))
elif msg[TYP] == ERR:
# Errors get processed by their TID's deferred's errback
if self.tids.has_key(msg[TID]):
# callback
req.errback(KrpcError(*msg[ERR]))
else:
- # day late and dollar short, just log it
- log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
- pass
+ # no tid, this transaction was finished already...
+ log.msg('received an error %r from %r for completed request: %r' %
+ (msg[ERR], msg[RSP]['id'], msg[TID]))
else:
# Received an unknown message type
if self.noisy:
@param args: the arguments to send to the remote node's method
"""
if self.stopped:
- raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
+ return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+ "cannot send, connection has been stopped"))
# Create the request message
newTID = newID()
log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
data = bencode(msg)
- # Create the deferred and save it with the TID
- req = KrpcRequest(self, newTID, method, data, 10)
+ # Create the request object and save it with the TID
+ req = KrpcRequest(self, newTID, method, data)
self.tids[newTID] = req
# Save the conclusion of the action
req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
callbackArgs = (method, ), errbackArgs = (method, ))
- req.send()
return req
def sendData(self, method, data):
- # Save some stats
+ """Write a request to the transport and save the stats.
+
+ @type method: C{string}
+ @param method: the name of the method to call on the remote node
+ @type data: C{string}
+ @param data: the message to send to the remote node
+ """
self.stats.sentAction(method)
self.stats.sentBytes(len(data))
self.transport.write(data, self.addr)
def timeOut(self, badTID, method):
- """Call the deferred's errback if a timeout occurs."""
+ """Call the deferred's errback if a timeout occurs.
+
+ @param badTID: the transaction ID of the request
+ @type method: C{string}
+ @param method: the name of the method that timed out on the remote node
+ """
if badTID in self.tids:
req = self.tids[badTID]
del(self.tids[badTID])
log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
def stop(self):
- """Timeout all pending requests."""
+ """Cancel all pending requests."""
for req in self.tids.values():
req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
'connection has been stopped while waiting for response'))