Merge small DHT buckets when a node is removed.
authorCameron Dale <camrdale@gmail.com>
Fri, 21 Mar 2008 01:36:33 +0000 (18:36 -0700)
committerCameron Dale <camrdale@gmail.com>
Fri, 21 Mar 2008 01:36:33 +0000 (18:36 -0700)
apt_p2p_Khashmir/ktable.py

index 1107f00ee4be9b2b19164809eae04172f6e0757e..25a99ec474888db1f1b8cf18b3acf7dae8b10dbc 100644 (file)
@@ -5,6 +5,7 @@
 
 from datetime import datetime
 from bisect import bisect_left
 
 from datetime import datetime
 from bisect import bisect_left
+from math import log as loge
 
 from twisted.python import log
 from twisted.trial import unittest
 
 from twisted.python import log
 from twisted.trial import unittest
@@ -111,6 +112,41 @@ class KTable:
                 a.l.remove(anode)
                 b.l.append(anode)
     
                 a.l.remove(anode)
                 b.l.append(anode)
     
+    def _mergeBucket(self, i):
+        """Merge unneeded buckets after removing a node.
+        
+        @type i: C{int}
+        @param i: the index of the bucket that lost a node
+        """
+        bucketRange = self.buckets[i].max - self.buckets[i].min
+        otherBucket = None
+
+        # Find if either of the neighbor buckets is the same size
+        # (this will only happen if this or the neighbour has our node ID in its range)
+        if i-1 >= 0 and self.buckets[i-1].max - self.buckets[i-1].min == bucketRange:
+            otherBucket = i-1
+        elif i+1 < len(self.buckets) and self.buckets[i+1].max - self.buckets[i+1].min == bucketRange:
+            otherBucket = i+1
+            
+        # Decide if we should do a merge
+        if otherBucket is not None and len(self.buckets[i].l) + len(self.buckets[otherBucket].l) <= self.config['K']:
+            # Remove one bucket and set the other to cover its range as well
+            b = self.buckets[i]
+            a = self.buckets.pop(otherBucket)
+            b.min = min(b.min, a.min)
+            b.max = max(b.max, a.max)
+
+            # Transfer the nodes to the bucket we're keeping, merging the sorting
+            bi = 0
+            for anode in a.l:
+                while bi < len(b.l) and b.l[bi].lastSeen <= anode.lastSeen:
+                    bi += 1
+                b.l.insert(bi, anode)
+                bi += 1
+                
+            # Recurse to check if the neighbour buckets can also be merged
+            self._mergeBucket(min(i, otherBucket))
+    
     def replaceStaleNode(self, stale, new = None):
         """Replace a stale node in a bucket with a new one.
         
     def replaceStaleNode(self, stale, new = None):
         """Replace a stale node in a bucket with a new one.
         
@@ -124,6 +160,7 @@ class KTable:
             not adding any node in the old node's place)
         """
         # Find the stale node's bucket
             not adding any node in the old node's place)
         """
         # Find the stale node's bucket
+        removed = False
         i = self._bucketIndexForInt(stale.num)
         try:
             it = self.buckets[i].l.index(stale.num)
         i = self._bucketIndexForInt(stale.num)
         try:
             it = self.buckets[i].l.index(stale.num)
@@ -132,10 +169,13 @@ class KTable:
         else:
             # Remove the stale node
             del(self.buckets[i].l[it])
         else:
             # Remove the stale node
             del(self.buckets[i].l[it])
+            removed = True
         
         # Insert the new node
         if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < self.config['K']:
             self.buckets[i].l.append(new)
         
         # Insert the new node
         if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < self.config['K']:
             self.buckets[i].l.append(new)
+        elif removed:
+            self._mergeBucket(i)
     
     def insertNode(self, node, contacted = True):
         """Try to insert a node in the routing table.
     
     def insertNode(self, node, contacted = True):
         """Try to insert a node in the routing table.
@@ -292,6 +332,17 @@ class KBucket:
         """Update the L{lastAccessed} time."""
         self.lastAccessed = datetime.now()
     
         """Update the L{lastAccessed} time."""
         self.lastAccessed = datetime.now()
     
+    def sort(self):
+        """Sort the nodes in the bucket by their lastSeen time."""
+        def _sort(a, b):
+            """Sort nodes by their lastSeen time."""
+            if a.lastSeen > b.lastSeen:
+                return 1
+            elif a.lastSeen < b.lastSeen:
+                return -1
+            return 0
+        self.l.sort(_sort)
+
     def getNodeWithInt(self, num):
         """Get the node in the bucket with that number.
         
     def getNodeWithInt(self, num):
         """Get the node in the bucket with that number.
         
@@ -305,7 +356,8 @@ class KBucket:
         else: raise ValueError
         
     def __repr__(self):
         else: raise ValueError
         
     def __repr__(self):
-        return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max)
+        return "<KBucket %d items (%f to %f, range %d)>" % (
+                len(self.l), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2))
     
     #{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long
     def __lt__(self, a):
     
     #{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long
     def __lt__(self, a):
@@ -345,6 +397,16 @@ class TestKTable(unittest.TestCase):
         self.t.invalidateNode(self.b)
         self.failUnlessEqual(len(self.t.buckets[0].l), 0)
 
         self.t.invalidateNode(self.b)
         self.failUnlessEqual(len(self.t.buckets[0].l), 0)
 
+    def testMergeBuckets(self):
+        for i in xrange(1000):
+            b = Node(khash.newID(), '127.0.0.1', 2003 + i)
+            self.t.insertNode(b)
+        num = len(self.t.buckets)
+        i = self.t._bucketIndexForInt(self.a.num)
+        for b in self.t.buckets[i].l[:]:
+            self.t.invalidateNode(b)
+        self.failUnlessEqual(len(self.t.buckets), num-1)
+
     def testFail(self):
         self.testAddNode()
         for i in range(self.t.config['MAX_FAILURES'] - 1):
     def testFail(self):
         self.testAddNode()
         for i in range(self.t.config['MAX_FAILURES'] - 1):