Make the DHT timeouts configuration parameters.
authorCameron Dale <camrdale@gmail.com>
Mon, 21 Apr 2008 19:41:24 +0000 (12:41 -0700)
committerCameron Dale <camrdale@gmail.com>
Mon, 21 Apr 2008 19:41:24 +0000 (12:41 -0700)
TODO
apt-p2p.conf
apt_p2p/apt_p2p_conf.py
apt_p2p_Khashmir/DHT.py
apt_p2p_Khashmir/khashmir.py
apt_p2p_Khashmir/krpc.py
debian/apt-p2p.conf.sgml
test.py

diff --git a/TODO b/TODO
index 8f0265b..c6c74bd 100644 (file)
--- a/TODO
+++ b/TODO
@@ -7,7 +7,6 @@ Some last few things to do before release.
 - remove files from the peer's download cache
 - update the modtime of files downloaded from peers
   - also set the Last-Modified header for the return to Apt
-- make the DHT timeouts configuration parameters
 - refresh expired DHT hashes concurrently instead of sequentially
 
 Consider what happens when multiple requests for a file are received.
index 6528935..2d40306 100644 (file)
@@ -108,5 +108,16 @@ BUCKET_STALENESS = 1h
 # expire unrefreshed entries older than this
 KEY_EXPIRE = 1h
 
+# Timeout KRPC requests to nodes after this time.
+KRPC_TIMEOUT = 14s
+
+# KRPC requests are resent using exponential backoff starting with this delay.
+# The request will first be resent after the delay set here.
+# The request will be resent again after twice the delay set here. etc.
+# e.g. if TIMEOUT is 14 sec., and INITIAL_DELAY is 2 sec., then requests will
+# be resent at times 0, 2 (2 sec. later), and 6 (4 sec. later), and then will
+# timeout at 14.
+KRPC_INITIAL_DELAY = 2s
+
 # whether to spew info about the requests/responses in the protocol
 SPEW = no
index f09588f..8494a0b 100644 (file)
@@ -123,6 +123,17 @@ DHT_DEFAULTS = {
     # expire entries older than this
     'KEY_EXPIRE': '3h', # 3 hours
     
+    # Timeout KRPC requests to nodes after this time.
+    'KRPC_TIMEOUT': '14s',
+    
+    # KRPC requests are resent using exponential backoff starting with this delay.
+    # The request will first be resent after the delay set here.
+    # The request will be resent again after twice the delay set here. etc.
+    # e.g. if TIMEOUT is 14 sec., and INITIAL_DELAY is 2 sec., then requests will
+    # be resent at times 0, 2 (2 sec. later), and 6 (4 sec. later), and then will
+    # timeout at 14.
+    'KRPC_INITIAL_DELAY': '2s',
+
     # whether to spew info about the requests/responses in the protocol
     'SPEW': 'no',
 }
index 1aaf7a2..3b206dd 100644 (file)
@@ -336,6 +336,7 @@ class TestSimpleDHT(unittest.TestCase):
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def setUp(self):
@@ -456,6 +457,7 @@ class TestMultiDHT(unittest.TestCase):
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def setUp(self):
index 580f0f0..7abed57 100644 (file)
@@ -523,6 +523,7 @@ class SimpleTests(unittest.TestCase):
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def setUp(self):
@@ -596,6 +597,7 @@ class MultiTest(unittest.TestCase):
                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
                     'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
+                    'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
                     'KEY_EXPIRE': 3600, 'SPEW': False, }
 
     def _done(self, val):
index acc5fbf..9ae5ac7 100644 (file)
@@ -3,9 +3,6 @@
 
 """The KRPC communication protocol implementation.
 
-@var KRPC_INITIAL_DELAY: the number of seconds after which to try resending
-    the request, the resends will wait twice as long each time
-@var KRPC_TIMEOUT: the number of seconds after which requests timeout
 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
     UDP packet without fragmentation
 
@@ -50,8 +47,6 @@ from twisted.trial import unittest
 
 from khash import newID
 
-KRPC_INITIAL_DELAY = 2
-KRPC_TIMEOUT = 14
 UDP_PACKET_LIMIT = 1472
 
 # Remote node errors
@@ -185,7 +180,7 @@ class hostbroker(protocol.DatagramProtocol):
         
         # Create a new protocol object if necessary
         if not self.connections.has_key(addr):
-            conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW'])
+            conn = self.protocol(addr, self.server, self.stats, self.transport, self.config)
             self.connections[addr] = conn
         else:
             conn = self.connections[addr]
@@ -213,6 +208,8 @@ class KrpcRequest(defer.Deferred):
     @ivar method: the name of the method to call on the remote node
     @type data: C{string}
     @ivar data: the message to send to the remote node
+    @type config: C{dictionary}
+    @ivar config: the configuration parameters for the DHT
     @type delay: C{int}
     @ivar delay: the last timeout delay sent
     @type start: C{datetime}
@@ -221,7 +218,7 @@ class KrpcRequest(defer.Deferred):
     @ivar later: the pending call to timeout the last sent request
     """
     
-    def __init__(self, protocol, newTID, method, data):
+    def __init__(self, protocol, newTID, method, data, config):
         """Initialize the request, and send it out.
         
         @type protocol: L{KRPC}
@@ -231,13 +228,16 @@ class KrpcRequest(defer.Deferred):
         @param method: the name of the method to call on the remote node
         @type data: C{string}
         @param data: the message to send to the remote node
+        @type config: C{dictionary}
+        @param config: the configuration parameters for the DHT
         """
         defer.Deferred.__init__(self)
         self.protocol = protocol
         self.tid = newTID
         self.method = method
         self.data = data
-        self.delay = KRPC_INITIAL_DELAY
+        self.config = config
+        self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
         self.start = datetime.now()
         self.later = None
         self.send()
@@ -252,7 +252,7 @@ class KrpcRequest(defer.Deferred):
         """Check for a unrecoverable timeout, otherwise resend."""
         self.later = None
         delay = datetime.now() - self.start
-        if delay > timedelta(seconds = KRPC_TIMEOUT):
+        if delay > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 14)):
             log.msg('%r timed out after %0.2f sec' %
                     (self.tid, delay.seconds + delay.microseconds/1000000.0))
             self.protocol.timeOut(self.tid, self.method)
@@ -287,8 +287,8 @@ class KRPC:
     @ivar stats: the statistics logger to save transport info
     @type addr: (C{string}, C{int})
     @ivar addr: the IP address and port of the source node
-    @type noisy: C{boolean}
-    @ivar noisy: whether to log additional details of the protocol
+    @type config: C{dictionary}
+    @ivar config: the configuration parameters for the DHT
     @type tids: C{dictionary}
     @ivar tids: the transaction IDs outstanding for requests, keys are the
         transaction ID of the request, values are the deferreds to call with
@@ -297,7 +297,7 @@ class KRPC:
     @ivar stopped: whether the protocol has been stopped
     """
     
-    def __init__(self, addr, server, stats, transport, spew = False):
+    def __init__(self, addr, server, stats, transport, config = {}):
         """Initialize the protocol.
         
         @type addr: (C{string}, C{int})
@@ -307,15 +307,15 @@ class KRPC:
         @type stats: L{stats.StatsLogger}
         @param stats: the statistics logger to save transport info
         @param transport: the transport to use for the protocol
-        @type spew: C{boolean}
-        @param spew: whether to log additional details of the protocol
-            (optional, defaults to False)
+        @type config: C{dictionary}
+        @param config: the configuration parameters for the DHT
+            (optional, defaults to using defaults)
         """
         self.transport = transport
         self.factory = server
         self.stats = stats
         self.addr = addr
-        self.noisy = spew
+        self.config = config
         self.tids = {}
         self.stopped = False
 
@@ -329,14 +329,14 @@ class KRPC:
         """
         self.stats.receivedBytes(len(data))
         if self.stopped:
-            if self.noisy:
+            if self.config.get('SPEW', False):
                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
 
         # Bdecode the message
         try:
             msg = bdecode(data)
         except Exception, e:
-            if self.noisy:
+            if self.config.get('SPEW', False):
                 log.msg("krpc bdecode error: ")
                 log.err(e)
             return
@@ -349,7 +349,7 @@ class KRPC:
             log.err(e)
             return
 
-        if self.noisy:
+        if self.config.get('SPEW', False):
             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
 
         # Process it based on its type
@@ -389,9 +389,8 @@ class KRPC:
                 self.stats.receivedAction('unknown')
                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
-            if self.noisy:
-                log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
-                                                  ilen, msg[REQ], olen))
+
+            log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen))
         elif msg[TYP] == RSP:
             # Responses get processed by their TID's deferred
             if self.tids.has_key(msg[TID]):
@@ -402,7 +401,7 @@ class KRPC:
                 req.callback(msg[RSP])
             else:
                 # no tid, this transaction was finished already...
-                if self.noisy:
+                if self.config.get('SPEW', False):
                     log.msg('received response from %r for completed request: %r' %
                             (msg[RSP]['id'], msg[TID]))
         elif msg[TYP] == ERR:
@@ -418,7 +417,7 @@ class KRPC:
                         (msg[ERR], msg[RSP]['id'], msg[TID]))
         else:
             # Received an unknown message type
-            if self.noisy:
+            if self.config.get('SPEW', False):
                 log.msg("unknown message type: %r" % msg)
             if msg[TID] in self.tids:
                 req = self.tids[msg[TID]]
@@ -444,7 +443,7 @@ class KRPC:
             # Create the response message
             msg = {TID : tid, TYP : msgType, msgType : response}
     
-            if self.noisy:
+            if self.config.get('SPEW', False):
                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
     
             out = bencode(msg)
@@ -513,12 +512,12 @@ class KRPC:
         # Create the request message
         newTID = newID()
         msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
-        if self.noisy:
+        if self.config.get('SPEW', False):
             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
         data = bencode(msg)
         
         # Create the request object and save it with the TID
-        req = KrpcRequest(self, newTID, method, data)
+        req = KrpcRequest(self, newTID, method, data, self.config)
         self.tids[newTID] = req
         
         # Save the conclusion of the action
@@ -583,7 +582,8 @@ class Receiver(protocol.Factory):
 def make(port):
     from stats import StatsLogger
     af = Receiver()
-    a = hostbroker(af, StatsLogger(None, None), {'SPEW': False})
+    a = hostbroker(af, StatsLogger(None, None),
+                   {'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
     a.protocol = KRPC
     p = reactor.listenUDP(port, a)
     return af, a, p
index cd08a40..21b9d3d 100644 (file)
            </listitem>
          </varlistentry>
          <varlistentry>
+           <term><option>KRPC_TIMEOUT = <replaceable>time</replaceable></option></term>
+            <listitem>
+             <para>The <replaceable>time</replaceable> to wait before KRPC requests timeout.
+                 (Default is 14 seconds.)</para>
+           </listitem>
+         </varlistentry>
+         <varlistentry>
+           <term><option>KRPC_INITIAL_DELAY = <replaceable>time</replaceable></option></term>
+            <listitem>
+             <para>The <replaceable>time</replaceable> to start with when resending KRPC requests using exponential backoff.
+                 The request will first be resent after the delay set here.
+                 The request will be resent again after twice the delay set here, and so on.
+                 e.g. if TIMEOUT is 14 sec., and INITIAL_DELAY is 2 sec., then requests will
+                 be resent at times 0, 2 (2 sec. later), and 6 (4 sec. later), and then will
+                 timeout at 14. (Default is 2 seconds.)</para>
+           </listitem>
+         </varlistentry>
+         <varlistentry>
            <term><option>SPEW = <replaceable>boolean</replaceable></option></term>
             <listitem>
              <para>Whether to log lots of info about the requests and responses in the protocol.
diff --git a/test.py b/test.py
index 673d1bc..d83ccf1 100755 (executable)
--- a/test.py
+++ b/test.py
@@ -477,6 +477,17 @@ BUCKET_STALENESS = 1h
 # expire entries older than this
 KEY_EXPIRE = 1h
 
+# Timeout KRPC requests to nodes after this time.
+KRPC_TIMEOUT = 14s
+
+# KRPC requests are resent using exponential backoff starting with this delay.
+# The request will first be resent after the delay set here.
+# The request will be resent again after twice the delay set here. etc.
+# e.g. if TIMEOUT is 14 sec., and INITIAL_DELAY is 2 sec., then requests will
+# be resent at times 0, 2 (2 sec. later), and 6 (4 sec. later), and then will
+# timeout at 14.
+KRPC_INITIAL_DELAY = 2s
+
 # whether to spew info about the requests/responses in the protocol
 SPEW = yes
 """