From: Cameron Dale Date: Mon, 21 Apr 2008 19:41:24 +0000 (-0700) Subject: Make the DHT timeouts configuration parameters. X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=dbcf7d0324c5ddee23bf170695e173fdff5a2c0e;p=quix0rs-apt-p2p.git Make the DHT timeouts configuration parameters. --- diff --git a/TODO b/TODO index 8f0265b..c6c74bd 100644 --- a/TODO +++ b/TODO @@ -7,7 +7,6 @@ Some last few things to do before release. - remove files from the peer's download cache - update the modtime of files downloaded from peers - also set the Last-Modified header for the return to Apt -- make the DHT timeouts configuration parameters - refresh expired DHT hashes concurrently instead of sequentially Consider what happens when multiple requests for a file are received. diff --git a/apt-p2p.conf b/apt-p2p.conf index 6528935..2d40306 100644 --- a/apt-p2p.conf +++ b/apt-p2p.conf @@ -108,5 +108,16 @@ BUCKET_STALENESS = 1h # expire unrefreshed entries older than this KEY_EXPIRE = 1h +# Timeout KRPC requests to nodes after this time. +KRPC_TIMEOUT = 14s + +# KRPC requests are resent using exponential backoff starting with this delay. +# The request will first be resent after the delay set here. +# The request will be resent again after twice the delay set here. etc. +# e.g. if TIMEOUT is 14 sec., and INITIAL_DELAY is 2 sec., then requests will +# be resent at times 0, 2 (2 sec. later), and 6 (4 sec. later), and then will +# timeout at 14. +KRPC_INITIAL_DELAY = 2s + # whether to spew info about the requests/responses in the protocol SPEW = no diff --git a/apt_p2p/apt_p2p_conf.py b/apt_p2p/apt_p2p_conf.py index f09588f..8494a0b 100644 --- a/apt_p2p/apt_p2p_conf.py +++ b/apt_p2p/apt_p2p_conf.py @@ -123,6 +123,17 @@ DHT_DEFAULTS = { # expire entries older than this 'KEY_EXPIRE': '3h', # 3 hours + # Timeout KRPC requests to nodes after this time. + 'KRPC_TIMEOUT': '14s', + + # KRPC requests are resent using exponential backoff starting with this delay. + # The request will first be resent after the delay set here. + # The request will be resent again after twice the delay set here. etc. + # e.g. if TIMEOUT is 14 sec., and INITIAL_DELAY is 2 sec., then requests will + # be resent at times 0, 2 (2 sec. later), and 6 (4 sec. later), and then will + # timeout at 14. + 'KRPC_INITIAL_DELAY': '2s', + # whether to spew info about the requests/responses in the protocol 'SPEW': 'no', } diff --git a/apt_p2p_Khashmir/DHT.py b/apt_p2p_Khashmir/DHT.py index 1aaf7a2..3b206dd 100644 --- a/apt_p2p_Khashmir/DHT.py +++ b/apt_p2p_Khashmir/DHT.py @@ -336,6 +336,7 @@ class TestSimpleDHT(unittest.TestCase): 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, + 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'KEY_EXPIRE': 3600, 'SPEW': False, } def setUp(self): @@ -456,6 +457,7 @@ class TestMultiDHT(unittest.TestCase): 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, + 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'KEY_EXPIRE': 3600, 'SPEW': False, } def setUp(self): diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index 580f0f0..7abed57 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -523,6 +523,7 @@ class SimpleTests(unittest.TestCase): 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, + 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'KEY_EXPIRE': 3600, 'SPEW': False, } def setUp(self): @@ -596,6 +597,7 @@ class MultiTest(unittest.TestCase): 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, + 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'KEY_EXPIRE': 3600, 'SPEW': False, } def _done(self, val): diff --git a/apt_p2p_Khashmir/krpc.py b/apt_p2p_Khashmir/krpc.py index acc5fbf..9ae5ac7 100644 --- a/apt_p2p_Khashmir/krpc.py +++ b/apt_p2p_Khashmir/krpc.py @@ -3,9 +3,6 @@ """The KRPC communication protocol implementation. -@var KRPC_INITIAL_DELAY: the number of seconds after which to try resending - the request, the resends will wait twice as long each time -@var KRPC_TIMEOUT: the number of seconds after which requests timeout @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a UDP packet without fragmentation @@ -50,8 +47,6 @@ from twisted.trial import unittest from khash import newID -KRPC_INITIAL_DELAY = 2 -KRPC_TIMEOUT = 14 UDP_PACKET_LIMIT = 1472 # Remote node errors @@ -185,7 +180,7 @@ class hostbroker(protocol.DatagramProtocol): # Create a new protocol object if necessary if not self.connections.has_key(addr): - conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW']) + conn = self.protocol(addr, self.server, self.stats, self.transport, self.config) self.connections[addr] = conn else: conn = self.connections[addr] @@ -213,6 +208,8 @@ class KrpcRequest(defer.Deferred): @ivar method: the name of the method to call on the remote node @type data: C{string} @ivar data: the message to send to the remote node + @type config: C{dictionary} + @ivar config: the configuration parameters for the DHT @type delay: C{int} @ivar delay: the last timeout delay sent @type start: C{datetime} @@ -221,7 +218,7 @@ class KrpcRequest(defer.Deferred): @ivar later: the pending call to timeout the last sent request """ - def __init__(self, protocol, newTID, method, data): + def __init__(self, protocol, newTID, method, data, config): """Initialize the request, and send it out. @type protocol: L{KRPC} @@ -231,13 +228,16 @@ class KrpcRequest(defer.Deferred): @param method: the name of the method to call on the remote node @type data: C{string} @param data: the message to send to the remote node + @type config: C{dictionary} + @param config: the configuration parameters for the DHT """ defer.Deferred.__init__(self) self.protocol = protocol self.tid = newTID self.method = method self.data = data - self.delay = KRPC_INITIAL_DELAY + self.config = config + self.delay = self.config.get('KRPC_INITIAL_DELAY', 2) self.start = datetime.now() self.later = None self.send() @@ -252,7 +252,7 @@ class KrpcRequest(defer.Deferred): """Check for a unrecoverable timeout, otherwise resend.""" self.later = None delay = datetime.now() - self.start - if delay > timedelta(seconds = KRPC_TIMEOUT): + if delay > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 14)): log.msg('%r timed out after %0.2f sec' % (self.tid, delay.seconds + delay.microseconds/1000000.0)) self.protocol.timeOut(self.tid, self.method) @@ -287,8 +287,8 @@ class KRPC: @ivar stats: the statistics logger to save transport info @type addr: (C{string}, C{int}) @ivar addr: the IP address and port of the source node - @type noisy: C{boolean} - @ivar noisy: whether to log additional details of the protocol + @type config: C{dictionary} + @ivar config: the configuration parameters for the DHT @type tids: C{dictionary} @ivar tids: the transaction IDs outstanding for requests, keys are the transaction ID of the request, values are the deferreds to call with @@ -297,7 +297,7 @@ class KRPC: @ivar stopped: whether the protocol has been stopped """ - def __init__(self, addr, server, stats, transport, spew = False): + def __init__(self, addr, server, stats, transport, config = {}): """Initialize the protocol. @type addr: (C{string}, C{int}) @@ -307,15 +307,15 @@ class KRPC: @type stats: L{stats.StatsLogger} @param stats: the statistics logger to save transport info @param transport: the transport to use for the protocol - @type spew: C{boolean} - @param spew: whether to log additional details of the protocol - (optional, defaults to False) + @type config: C{dictionary} + @param config: the configuration parameters for the DHT + (optional, defaults to using defaults) """ self.transport = transport self.factory = server self.stats = stats self.addr = addr - self.noisy = spew + self.config = config self.tids = {} self.stopped = False @@ -329,14 +329,14 @@ class KRPC: """ self.stats.receivedBytes(len(data)) if self.stopped: - if self.noisy: + if self.config.get('SPEW', False): log.msg("stopped, dropping message from %r: %s" % (addr, data)) # Bdecode the message try: msg = bdecode(data) except Exception, e: - if self.noisy: + if self.config.get('SPEW', False): log.msg("krpc bdecode error: ") log.err(e) return @@ -349,7 +349,7 @@ class KRPC: log.err(e) return - if self.noisy: + if self.config.get('SPEW', False): log.msg("%d received from %r: %s" % (self.factory.port, addr, msg)) # Process it based on its type @@ -389,9 +389,8 @@ class KRPC: self.stats.receivedAction('unknown') olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR, [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])]) - if self.noisy: - log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port, - ilen, msg[REQ], olen)) + + log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen)) elif msg[TYP] == RSP: # Responses get processed by their TID's deferred if self.tids.has_key(msg[TID]): @@ -402,7 +401,7 @@ class KRPC: req.callback(msg[RSP]) else: # no tid, this transaction was finished already... - if self.noisy: + if self.config.get('SPEW', False): log.msg('received response from %r for completed request: %r' % (msg[RSP]['id'], msg[TID])) elif msg[TYP] == ERR: @@ -418,7 +417,7 @@ class KRPC: (msg[ERR], msg[RSP]['id'], msg[TID])) else: # Received an unknown message type - if self.noisy: + if self.config.get('SPEW', False): log.msg("unknown message type: %r" % msg) if msg[TID] in self.tids: req = self.tids[msg[TID]] @@ -444,7 +443,7 @@ class KRPC: # Create the response message msg = {TID : tid, TYP : msgType, msgType : response} - if self.noisy: + if self.config.get('SPEW', False): log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg)) out = bencode(msg) @@ -513,12 +512,12 @@ class KRPC: # Create the request message newTID = newID() msg = {TID : newTID, TYP : REQ, REQ : method, ARG : args} - if self.noisy: + if self.config.get('SPEW', False): log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg)) data = bencode(msg) # Create the request object and save it with the TID - req = KrpcRequest(self, newTID, method, data) + req = KrpcRequest(self, newTID, method, data, self.config) self.tids[newTID] = req # Save the conclusion of the action @@ -583,7 +582,8 @@ class Receiver(protocol.Factory): def make(port): from stats import StatsLogger af = Receiver() - a = hostbroker(af, StatsLogger(None, None), {'SPEW': False}) + a = hostbroker(af, StatsLogger(None, None), + {'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False}) a.protocol = KRPC p = reactor.listenUDP(port, a) return af, a, p diff --git a/debian/apt-p2p.conf.sgml b/debian/apt-p2p.conf.sgml index cd08a40..21b9d3d 100644 --- a/debian/apt-p2p.conf.sgml +++ b/debian/apt-p2p.conf.sgml @@ -270,6 +270,24 @@ (Default is 1 hour.) + + + + The time to wait before KRPC requests timeout. + (Default is 14 seconds.) + + + + + + The time to start with when resending KRPC requests using exponential backoff. + The request will first be resent after the delay set here. + The request will be resent again after twice the delay set here, and so on. + e.g. if TIMEOUT is 14 sec., and INITIAL_DELAY is 2 sec., then requests will + be resent at times 0, 2 (2 sec. later), and 6 (4 sec. later), and then will + timeout at 14. (Default is 2 seconds.) + + diff --git a/test.py b/test.py index 673d1bc..d83ccf1 100755 --- a/test.py +++ b/test.py @@ -477,6 +477,17 @@ BUCKET_STALENESS = 1h # expire entries older than this KEY_EXPIRE = 1h +# Timeout KRPC requests to nodes after this time. +KRPC_TIMEOUT = 14s + +# KRPC requests are resent using exponential backoff starting with this delay. +# The request will first be resent after the delay set here. +# The request will be resent again after twice the delay set here. etc. +# e.g. if TIMEOUT is 14 sec., and INITIAL_DELAY is 2 sec., then requests will +# be resent at times 0, 2 (2 sec. later), and 6 (4 sec. later), and then will +# timeout at 14. +KRPC_INITIAL_DELAY = 2s + # whether to spew info about the requests/responses in the protocol SPEW = yes """