From df9831500326100920070d964c659cffb67a95ab Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Thu, 20 Mar 2008 18:36:33 -0700 Subject: [PATCH] Merge small DHT buckets when a node is removed. --- apt_p2p_Khashmir/ktable.py | 64 +++++++++++++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/apt_p2p_Khashmir/ktable.py b/apt_p2p_Khashmir/ktable.py index 1107f00..25a99ec 100644 --- a/apt_p2p_Khashmir/ktable.py +++ b/apt_p2p_Khashmir/ktable.py @@ -5,6 +5,7 @@ from datetime import datetime from bisect import bisect_left +from math import log as loge from twisted.python import log from twisted.trial import unittest @@ -111,6 +112,41 @@ class KTable: a.l.remove(anode) b.l.append(anode) + def _mergeBucket(self, i): + """Merge unneeded buckets after removing a node. + + @type i: C{int} + @param i: the index of the bucket that lost a node + """ + bucketRange = self.buckets[i].max - self.buckets[i].min + otherBucket = None + + # Find if either of the neighbor buckets is the same size + # (this will only happen if this or the neighbour has our node ID in its range) + if i-1 >= 0 and self.buckets[i-1].max - self.buckets[i-1].min == bucketRange: + otherBucket = i-1 + elif i+1 < len(self.buckets) and self.buckets[i+1].max - self.buckets[i+1].min == bucketRange: + otherBucket = i+1 + + # Decide if we should do a merge + if otherBucket is not None and len(self.buckets[i].l) + len(self.buckets[otherBucket].l) <= self.config['K']: + # Remove one bucket and set the other to cover its range as well + b = self.buckets[i] + a = self.buckets.pop(otherBucket) + b.min = min(b.min, a.min) + b.max = max(b.max, a.max) + + # Transfer the nodes to the bucket we're keeping, merging the sorting + bi = 0 + for anode in a.l: + while bi < len(b.l) and b.l[bi].lastSeen <= anode.lastSeen: + bi += 1 + b.l.insert(bi, anode) + bi += 1 + + # Recurse to check if the neighbour buckets can also be merged + self._mergeBucket(min(i, otherBucket)) + def replaceStaleNode(self, stale, new = None): """Replace a stale node in a bucket with a new one. @@ -124,6 +160,7 @@ class KTable: not adding any node in the old node's place) """ # Find the stale node's bucket + removed = False i = self._bucketIndexForInt(stale.num) try: it = self.buckets[i].l.index(stale.num) @@ -132,10 +169,13 @@ class KTable: else: # Remove the stale node del(self.buckets[i].l[it]) + removed = True # Insert the new node if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < self.config['K']: self.buckets[i].l.append(new) + elif removed: + self._mergeBucket(i) def insertNode(self, node, contacted = True): """Try to insert a node in the routing table. @@ -292,6 +332,17 @@ class KBucket: """Update the L{lastAccessed} time.""" self.lastAccessed = datetime.now() + def sort(self): + """Sort the nodes in the bucket by their lastSeen time.""" + def _sort(a, b): + """Sort nodes by their lastSeen time.""" + if a.lastSeen > b.lastSeen: + return 1 + elif a.lastSeen < b.lastSeen: + return -1 + return 0 + self.l.sort(_sort) + def getNodeWithInt(self, num): """Get the node in the bucket with that number. @@ -305,7 +356,8 @@ class KBucket: else: raise ValueError def __repr__(self): - return "" % (len(self.l), self.min, self.max) + return "" % ( + len(self.l), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2)) #{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long def __lt__(self, a): @@ -345,6 +397,16 @@ class TestKTable(unittest.TestCase): self.t.invalidateNode(self.b) self.failUnlessEqual(len(self.t.buckets[0].l), 0) + def testMergeBuckets(self): + for i in xrange(1000): + b = Node(khash.newID(), '127.0.0.1', 2003 + i) + self.t.insertNode(b) + num = len(self.t.buckets) + i = self.t._bucketIndexForInt(self.a.num) + for b in self.t.buckets[i].l[:]: + self.t.invalidateNode(b) + self.failUnlessEqual(len(self.t.buckets), num-1) + def testFail(self): self.testAddNode() for i in range(self.t.config['MAX_FAILURES'] - 1): -- 2.39.5