]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/ktable.py
Only add nodes to the routing table that have responded to a request.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / ktable.py
index 3e60f5c37d0156e6f51c0f9d3543e689012424f4..e91c0db9843cf250fb5ab46bc0226b21150be314 100644 (file)
@@ -1,10 +1,12 @@
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
 
-"""The routing table and buckets for a kademlia-like DHT."""
+"""The routing table and buckets for a kademlia-like DHT.
+
+@var K: the Kademlia "K" constant, this should be an even number
+"""
 
 from datetime import datetime
 from bisect import bisect_left
+from math import log as loge
 
 from twisted.python import log
 from twisted.trial import unittest
@@ -12,6 +14,8 @@ from twisted.trial import unittest
 import khash
 from node import Node, NULL_ID
 
+K = 8
+
 class KTable:
     """Local routing table for a kademlia-like distributed hash table.
     
@@ -35,7 +39,7 @@ class KTable:
         assert node.id != NULL_ID
         self.node = node
         self.config = config
-        self.buckets = [KBucket([], 0L, 2L**self.config['HASH_LENGTH'])]
+        self.buckets = [KBucket([], 0L, 2L**(khash.HASH_LENGTH*8))]
         
     def _bucketIndexForInt(self, num):
         """Find the index of the bucket that should hold the node's ID number."""
@@ -74,11 +78,11 @@ class KTable:
         nodes = list(self.buckets[i].l)
         
         # Make sure we have enough
-        if len(nodes) < self.config['K']:
+        if len(nodes) < K:
             # Look in adjoining buckets for nodes
             min = i - 1
             max = i + 1
-            while len(nodes) < self.config['K'] and (min >= 0 or max < len(self.buckets)):
+            while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
                 # Add the adjoining buckets' nodes to the list
                 if min >= 0:
                     nodes = nodes + self.buckets[min].l
@@ -89,7 +93,7 @@ class KTable:
     
         # Sort the found nodes by proximity to the id and return the closest K
         nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
-        return nodes[:self.config['K']]
+        return nodes[:K]
         
     def _splitBucket(self, a):
         """Split a bucket in two.
@@ -111,6 +115,41 @@ class KTable:
                 a.l.remove(anode)
                 b.l.append(anode)
     
+    def _mergeBucket(self, i):
+        """Merge unneeded buckets after removing a node.
+        
+        @type i: C{int}
+        @param i: the index of the bucket that lost a node
+        """
+        bucketRange = self.buckets[i].max - self.buckets[i].min
+        otherBucket = None
+
+        # Find if either of the neighbor buckets is the same size
+        # (this will only happen if this or the neighbour has our node ID in its range)
+        if i-1 >= 0 and self.buckets[i-1].max - self.buckets[i-1].min == bucketRange:
+            otherBucket = i-1
+        elif i+1 < len(self.buckets) and self.buckets[i+1].max - self.buckets[i+1].min == bucketRange:
+            otherBucket = i+1
+            
+        # Decide if we should do a merge
+        if otherBucket is not None and len(self.buckets[i].l) + len(self.buckets[otherBucket].l) <= K:
+            # Remove one bucket and set the other to cover its range as well
+            b = self.buckets[i]
+            a = self.buckets.pop(otherBucket)
+            b.min = min(b.min, a.min)
+            b.max = max(b.max, a.max)
+
+            # Transfer the nodes to the bucket we're keeping, merging the sorting
+            bi = 0
+            for anode in a.l:
+                while bi < len(b.l) and b.l[bi].lastSeen <= anode.lastSeen:
+                    bi += 1
+                b.l.insert(bi, anode)
+                bi += 1
+                
+            # Recurse to check if the neighbour buckets can also be merged
+            self._mergeBucket(min(i, otherBucket))
+    
     def replaceStaleNode(self, stale, new = None):
         """Replace a stale node in a bucket with a new one.
         
@@ -124,21 +163,29 @@ class KTable:
             not adding any node in the old node's place)
         """
         # Find the stale node's bucket
+        removed = False
         i = self._bucketIndexForInt(stale.num)
         try:
             it = self.buckets[i].l.index(stale.num)
         except ValueError:
-            return
-    
-        # Remove the stale node and insert the new one
-        del(self.buckets[i].l[it])
-        if new:
+            pass
+        else:
+            # Remove the stale node
+            del(self.buckets[i].l[it])
+            removed = True
+            log.msg('Removed node from routing table: %s/%s' % (stale.host, stale.port))
+        
+        # Insert the new node
+        if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < K:
             self.buckets[i].l.append(new)
+        elif removed:
+            self._mergeBucket(i)
     
     def insertNode(self, node, contacted = True):
         """Try to insert a node in the routing table.
         
-        This inserts the node, returning None if successful, otherwise returns
+        This inserts the node, returning True if successful, False if the
+        node could have been added if it responds to a ping, otherwise returns
         the oldest node in the bucket if it's full. The caller is then
         responsible for pinging the returned node and calling replaceStaleNode
         if it doesn't respond. contacted means that yes, we contacted THEM and
@@ -149,11 +196,13 @@ class KTable:
         @type contacted: C{boolean}
         @param contacted: whether the new node is known to be good, i.e.
             responded to a request (optional, defaults to True)
-        @rtype: L{node.Node}
-        @return: None if successful (the bucket wasn't full), otherwise returns the oldest node in the bucket
+        @rtype: L{node.Node} or C{boolean}
+        @return: True if successful (the bucket wasn't full), False if the
+            node could have been added if it was contacted, otherwise
+            returns the oldest node in the bucket
         """
         assert node.id != NULL_ID
-        if node.id == self.node.id: return
+        if node.id == self.node.id: return True
 
         # Get the bucket for this node
         i = self._bucketIndexForInt(node.num)
@@ -174,16 +223,18 @@ class KTable:
                 # utilizing this nodes new contact info
                 self.buckets[i].l.append(node)
                 self.buckets[i].touch()
-            return
+            return True
         
         # We don't have this node, check to see if the bucket is full
-        if len(self.buckets[i].l) < self.config['K']:
+        if len(self.buckets[i].l) < K:
             # Not full, append this node and return
             if contacted:
                 node.updateLastSeen()
-            self.buckets[i].l.append(node)
-            self.buckets[i].touch()
-            return
+                self.buckets[i].l.append(node)
+                self.buckets[i].touch()
+                log.msg('Added node to routing table: %s/%s' % (node.host, node.port))
+                return True
+            return False
             
         # Bucket is full, check to see if the local node is not in the bucket
         if not (self.buckets[i].min <= self.node < self.buckets[i].max):
@@ -191,8 +242,8 @@ class KTable:
             return self.buckets[i].l[0]
         
         # Make sure our table isn't FULL, this is really unlikely
-        if len(self.buckets) >= self.config['HASH_LENGTH']:
-            log.err("Hash Table is FULL!  Increase K!")
+        if len(self.buckets) >= (khash.HASH_LENGTH*8):
+            log.err(RuntimeError("Hash Table is FULL! Increase K!"))
             return
             
         # This bucket is full and contains our node, split the bucket
@@ -226,6 +277,12 @@ class KTable:
             n = self.buckets[i].l[it]
             tstamp = n.lastSeen
             n.updateLastSeen()
+            
+            # Move the node to the end and touch the bucket
+            del(self.buckets[i].l[it])
+            self.buckets[i].l.append(n)
+            self.buckets[i].touch()
+            
             return tstamp
     
     def invalidateNode(self, n):
@@ -284,6 +341,17 @@ class KBucket:
         """Update the L{lastAccessed} time."""
         self.lastAccessed = datetime.now()
     
+    def sort(self):
+        """Sort the nodes in the bucket by their lastSeen time."""
+        def _sort(a, b):
+            """Sort nodes by their lastSeen time."""
+            if a.lastSeen > b.lastSeen:
+                return 1
+            elif a.lastSeen < b.lastSeen:
+                return -1
+            return 0
+        self.l.sort(_sort)
+
     def getNodeWithInt(self, num):
         """Get the node in the bucket with that number.
         
@@ -297,7 +365,8 @@ class KBucket:
         else: raise ValueError
         
     def __repr__(self):
-        return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max)
+        return "<KBucket %d items (%f to %f, range %d)>" % (
+                len(self.l), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2))
     
     #{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long
     def __lt__(self, a):
@@ -324,7 +393,7 @@ class TestKTable(unittest.TestCase):
     
     def setUp(self):
         self.a = Node(khash.newID(), '127.0.0.1', 2002)
-        self.t = KTable(self.a, {'HASH_LENGTH': 160, 'K': 8, 'MAX_FAILURES': 3})
+        self.t = KTable(self.a, {'MAX_FAILURES': 3})
 
     def testAddNode(self):
         self.b = Node(khash.newID(), '127.0.0.1', 2003)
@@ -337,6 +406,16 @@ class TestKTable(unittest.TestCase):
         self.t.invalidateNode(self.b)
         self.failUnlessEqual(len(self.t.buckets[0].l), 0)
 
+    def testMergeBuckets(self):
+        for i in xrange(1000):
+            b = Node(khash.newID(), '127.0.0.1', 2003 + i)
+            self.t.insertNode(b)
+        num = len(self.t.buckets)
+        i = self.t._bucketIndexForInt(self.a.num)
+        for b in self.t.buckets[i].l[:]:
+            self.t.invalidateNode(b)
+        self.failUnlessEqual(len(self.t.buckets), num-1)
+
     def testFail(self):
         self.testAddNode()
         for i in range(self.t.config['MAX_FAILURES'] - 1):