Move the bucket manipulation into the KBucket class.
authorCameron Dale <camrdale@gmail.com>
Thu, 8 May 2008 21:04:57 +0000 (14:04 -0700)
committerCameron Dale <camrdale@gmail.com>
Thu, 8 May 2008 21:06:38 +0000 (14:06 -0700)
apt_p2p_Khashmir/db.py
apt_p2p_Khashmir/khashmir.py
apt_p2p_Khashmir/ktable.py
apt_p2p_Khashmir/stats.py
apt_p2p_Khashmir/util.py

index a796256..d0433e0 100644 (file)
@@ -103,7 +103,7 @@ class DB:
         c = self.conn.cursor()
         c.execute("DELETE FROM nodes WHERE id NOT NULL")
         for bucket in buckets:
-            for node in bucket.l:
+            for node in bucket.nodes:
                 c.execute("INSERT INTO nodes VALUES (?, ?, ?)", (khash(node.id), node.host, node.port))
         self.conn.commit()
         
@@ -209,7 +209,7 @@ class TestDB(unittest.TestCase):
         dummy2.port = 12345
         class bl:
             def __init__(self):
-                self.l = []
+                self.nodes = []
         bl1 = bl()
         bl1.l.append(dummy())
         bl2 = bl()
index 74263a6..c89870e 100644 (file)
@@ -582,10 +582,10 @@ class SimpleTests(unittest.TestCase):
 
     def testAddContact(self):
         self.failUnlessEqual(len(self.a.table.buckets), 1)
-        self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
 
         self.failUnlessEqual(len(self.b.table.buckets), 1)
-        self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
 
         self.a.addContact('127.0.0.1', 4045)
         reactor.iterate()
@@ -594,9 +594,9 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
 
         self.failUnlessEqual(len(self.a.table.buckets), 1)
-        self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
+        self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
         self.failUnlessEqual(len(self.b.table.buckets), 1)
-        self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
+        self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
 
     def testStoreRetrieve(self):
         self.a.addContact('127.0.0.1', 4045)
index e91c0db..24ea93f 100644 (file)
@@ -48,7 +48,7 @@ class KTable:
     def _nodeNum(self, id):
         """Takes different types of input and converts to the node ID number.
 
-        @type id: C{string} of C{int} or L{node.Node}
+        @type id: C{string} or C{int} or L{node.Node}
         @param id: the ID to find nodes that are close to
         @raise TypeError: if id does not properly identify an ID
         """
@@ -66,7 +66,7 @@ class KTable:
     def findNodes(self, id):
         """Find the K nodes in our own local table closest to the ID.
 
-        @type id: C{string} of C{int} or L{node.Node}
+        @type id: C{string} or C{int} or L{node.Node}
         @param id: the ID to find nodes that are close to
         """
 
@@ -75,7 +75,7 @@ class KTable:
             
         # Get the K closest nodes from the appropriate bucket
         i = self._bucketIndexForInt(num)
-        nodes = list(self.buckets[i].l)
+        nodes = self.buckets[i].list()
         
         # Make sure we have enough
         if len(nodes) < K:
@@ -85,9 +85,9 @@ class KTable:
             while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
                 # Add the adjoining buckets' nodes to the list
                 if min >= 0:
-                    nodes = nodes + self.buckets[min].l
+                    nodes = nodes + self.buckets[min].list()
                 if max < len(self.buckets):
-                    nodes = nodes + self.buckets[max].l
+                    nodes = nodes + self.buckets[max].list()
                 min = min - 1
                 max = max + 1
     
@@ -95,26 +95,6 @@ class KTable:
         nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
         return nodes[:K]
         
-    def _splitBucket(self, a):
-        """Split a bucket in two.
-        
-        @type a: L{KBucket}
-        @param a: the bucket to split
-        """
-        # Create a new bucket with half the (upper) range of the current bucket
-        diff = (a.max - a.min) / 2
-        b = KBucket([], a.max - diff, a.max)
-        self.buckets.insert(self.buckets.index(a.min) + 1, b)
-        
-        # Reduce the input bucket's (upper) range 
-        a.max = a.max - diff
-
-        # Transfer nodes to the new bucket
-        for anode in a.l[:]:
-            if anode.num >= a.max:
-                a.l.remove(anode)
-                b.l.append(anode)
-    
     def _mergeBucket(self, i):
         """Merge unneeded buckets after removing a node.
         
@@ -131,21 +111,10 @@ class KTable:
         elif i+1 < len(self.buckets) and self.buckets[i+1].max - self.buckets[i+1].min == bucketRange:
             otherBucket = i+1
             
-        # Decide if we should do a merge
-        if otherBucket is not None and len(self.buckets[i].l) + len(self.buckets[otherBucket].l) <= K:
-            # Remove one bucket and set the other to cover its range as well
-            b = self.buckets[i]
-            a = self.buckets.pop(otherBucket)
-            b.min = min(b.min, a.min)
-            b.max = max(b.max, a.max)
-
-            # Transfer the nodes to the bucket we're keeping, merging the sorting
-            bi = 0
-            for anode in a.l:
-                while bi < len(b.l) and b.l[bi].lastSeen <= anode.lastSeen:
-                    bi += 1
-                b.l.insert(bi, anode)
-                bi += 1
+        # Try and do a merge
+        if otherBucket is not None and self.buckets[i].merge(self.buckets[otherBucket]):
+            # Merge was successful, remove the old bucket
+            self.buckets.pop(otherBucket)
                 
             # Recurse to check if the neighbour buckets can also be merged
             self._mergeBucket(min(i, otherBucket))
@@ -166,18 +135,17 @@ class KTable:
         removed = False
         i = self._bucketIndexForInt(stale.num)
         try:
-            it = self.buckets[i].l.index(stale.num)
+            self.buckets[i].remove(stale.num)
         except ValueError:
             pass
         else:
-            # Remove the stale node
-            del(self.buckets[i].l[it])
+            # Removed the stale node
             removed = True
             log.msg('Removed node from routing table: %s/%s' % (stale.host, stale.port))
         
         # Insert the new node
-        if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < K:
-            self.buckets[i].l.append(new)
+        if new and self._bucketIndexForInt(new.num) == i and self.buckets[i].len() < K:
+            self.buckets[i].add(new)
         elif removed:
             self._mergeBucket(i)
     
@@ -209,7 +177,7 @@ class KTable:
 
         # Check to see if node is in the bucket already
         try:
-            it = self.buckets[i].l.index(node.num)
+            self.buckets[i].node(node.num)
         except ValueError:
             pass
         else:
@@ -218,19 +186,19 @@ class KTable:
                 # It responded, so update it
                 node.updateLastSeen()
                 # move node to end of bucket
-                del(self.buckets[i].l[it])
+                self.buckets[i].remove(node.num)
                 # note that we removed the original and replaced it with the new one
                 # utilizing this nodes new contact info
-                self.buckets[i].l.append(node)
+                self.buckets[i].add(node)
                 self.buckets[i].touch()
             return True
         
         # We don't have this node, check to see if the bucket is full
-        if len(self.buckets[i].l) < K:
+        if self.buckets[i].len() < K:
             # Not full, append this node and return
             if contacted:
                 node.updateLastSeen()
-                self.buckets[i].l.append(node)
+                self.buckets[i].add(node)
                 self.buckets[i].touch()
                 log.msg('Added node to routing table: %s/%s' % (node.host, node.port))
                 return True
@@ -239,7 +207,7 @@ class KTable:
         # Bucket is full, check to see if the local node is not in the bucket
         if not (self.buckets[i].min <= self.node < self.buckets[i].max):
             # Local node not in the bucket, can't split it, return the oldest node
-            return self.buckets[i].l[0]
+            return self.buckets[i].oldest()
         
         # Make sure our table isn't FULL, this is really unlikely
         if len(self.buckets) >= (khash.HASH_LENGTH*8):
@@ -247,7 +215,8 @@ class KTable:
             return
             
         # This bucket is full and contains our node, split the bucket
-        self._splitBucket(self.buckets[i])
+        newBucket = self.buckets[i].split()
+        self.buckets.insert(i + 1, newBucket)
         
         # Now that the bucket is split and balanced, try to insert the node again
         return self.insertNode(node)
@@ -258,7 +227,7 @@ class KTable:
         Call this any time you get a message from a node, it will update it
         in the table if it's there.
 
-        @type id: C{string} of C{int} or L{node.Node}
+        @type id: C{string} or C{int} or L{node.Node}
         @param id: the node ID to mark as just having been seen
         @rtype: C{datetime.datetime}
         @return: the old lastSeen time of the node, or None if it's not in the table
@@ -269,20 +238,11 @@ class KTable:
 
         # Check to see if node is in the bucket
         try:
-            it = self.buckets[i].l.index(num)
+            tstamp = self.buckets[i].justSeen(num)
         except ValueError:
             return None
         else:
-            # The node is in the bucket
-            n = self.buckets[i].l[it]
-            tstamp = n.lastSeen
-            n.updateLastSeen()
-            
-            # Move the node to the end and touch the bucket
-            del(self.buckets[i].l[it])
-            self.buckets[i].l.append(n)
             self.buckets[i].touch()
-            
             return tstamp
     
     def invalidateNode(self, n):
@@ -300,20 +260,19 @@ class KTable:
 
         # Check to see if node is in the bucket
         try:
-            it = self.buckets[i].l.index(num)
+            n = self.buckets[i].node(num)
         except ValueError:
             return None
         else:
             # The node is in the bucket
-            n = self.buckets[i].l[it]
             if n.msgFailed() >= self.config['MAX_FAILURES']:
                 self.invalidateNode(n)
                         
 class KBucket:
     """Single bucket of nodes in a kademlia-like routing table.
     
-    @type l: C{list} of L{node.Node}
-    @ivar l: the nodes that are in this bucket
+    @type nodes: C{list} of L{node.Node}
+    @ivar nodes: the nodes that are in this bucket
     @type min: C{long}
     @ivar min: the minimum node ID that can be in this bucket
     @type max: C{long}
@@ -332,15 +291,29 @@ class KBucket:
         @type max: C{long}
         @param max: the maximum node ID that can be in this bucket
         """
-        self.l = contents
+        self.nodes = contents
         self.min = min
         self.max = max
         self.lastAccessed = datetime.now()
         
-    def touch(self):
-        """Update the L{lastAccessed} time."""
-        self.lastAccessed = datetime.now()
+    def __repr__(self):
+        return "<KBucket %d items (%f to %f, range %d)>" % (
+                len(self.nodes), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2))
     
+    #{ List-like functions
+    def len(self): return len(self.nodes)
+    def list(self): return list(self.nodes)
+    def node(self, num): return self.nodes[self.nodes.index(num)]
+    def remove(self, num): return self.nodes.pop(self.nodes.index(num))
+    def oldest(self): return self.nodes[0]
+
+    def add(self, node):
+        """Add the node in the correct sorted order."""
+        i = len(self.nodes)
+        while i > 0 and node.lastSeen < self.nodes[i-1].lastSeen:
+            i -= 1
+        self.nodes.insert(i, node)
+        
     def sort(self):
         """Sort the nodes in the bucket by their lastSeen time."""
         def _sort(a, b):
@@ -350,24 +323,76 @@ class KBucket:
             elif a.lastSeen < b.lastSeen:
                 return -1
             return 0
-        self.l.sort(_sort)
+        self.nodes.sort(_sort)
+        
+    #{ Bucket functions
+    def touch(self):
+        """Update the L{lastAccessed} time."""
+        self.lastAccessed = datetime.now()
+    
+    def justSeen(self, num):
+        """Mark a node as having been seen.
+        
+        @param num: the number of the node just seen
+        """
+        i = self.nodes.index(num)
+        
+        # The node is in the bucket
+        n = self.nodes[i]
+        tstamp = n.lastSeen
+        n.updateLastSeen()
+        
+        # Move the node to the end and touch the bucket
+        self.nodes.pop(i)
+        self.nodes.append(n)
+        
+        return tstamp
 
-    def getNodeWithInt(self, num):
-        """Get the node in the bucket with that number.
+    def split(self):
+        """Split a bucket in two.
         
-        @type num: C{long}
-        @param num: the node ID to look for
-        @raise ValueError: if the node ID is not in the bucket
-        @rtype: L{node.Node}
-        @return: the node
+        @rtype: L{KBucket}
+        @return: the new bucket split from this one
         """
-        if num in self.l: return num
-        else: raise ValueError
+        # Create a new bucket with half the (upper) range of the current bucket
+        diff = (self.max - self.min) / 2
+        new = KBucket([], self.max - diff, self.max)
         
-    def __repr__(self):
-        return "<KBucket %d items (%f to %f, range %d)>" % (
-                len(self.l), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2))
+        # Reduce the input bucket's (upper) range 
+        self.max = self.max - diff
+
+        # Transfer nodes to the new bucket
+        for node in self.nodes[:]:
+            if node.num >= self.max:
+                self.nodes.remove(node)
+                new.add(node)
+        return new
     
+    def merge(self, old):
+        """Try to merge two buckets into one.
+        
+        @type old: L{KBucket}
+        @param old: the bucket to merge into this one
+        @return: whether a merge was done or not
+        """
+        # Decide if we should do a merge
+        if len(self.nodes) + old.len() > K:
+            return False
+
+        # Set the range to cover the other's as well
+        self.min = min(self.min, old.min)
+        self.max = max(self.max, old.max)
+
+        # Transfer the other's nodes to this bucket, merging the sorting
+        i = 0
+        for node in old.list():
+            while i < len(self.nodes) and self.nodes[i].lastSeen <= node.lastSeen:
+                i += 1
+            self.nodes.insert(i, node)
+            i += 1
+
+        return True
+                
     #{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long
     def __lt__(self, a):
         if isinstance(a, Node): a = a.num
@@ -398,13 +423,13 @@ class TestKTable(unittest.TestCase):
     def testAddNode(self):
         self.b = Node(khash.newID(), '127.0.0.1', 2003)
         self.t.insertNode(self.b)
-        self.failUnlessEqual(len(self.t.buckets[0].l), 1)
-        self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
+        self.failUnlessEqual(len(self.t.buckets[0].nodes), 1)
+        self.failUnlessEqual(self.t.buckets[0].nodes[0], self.b)
 
     def testRemove(self):
         self.testAddNode()
         self.t.invalidateNode(self.b)
-        self.failUnlessEqual(len(self.t.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.t.buckets[0].nodes), 0)
 
     def testMergeBuckets(self):
         for i in xrange(1000):
@@ -412,7 +437,7 @@ class TestKTable(unittest.TestCase):
             self.t.insertNode(b)
         num = len(self.t.buckets)
         i = self.t._bucketIndexForInt(self.a.num)
-        for b in self.t.buckets[i].l[:]:
+        for b in self.t.buckets[i].nodes[:]:
             self.t.invalidateNode(b)
         self.failUnlessEqual(len(self.t.buckets), num-1)
 
@@ -420,8 +445,8 @@ class TestKTable(unittest.TestCase):
         self.testAddNode()
         for i in range(self.t.config['MAX_FAILURES'] - 1):
             self.t.nodeFailed(self.b)
-            self.failUnlessEqual(len(self.t.buckets[0].l), 1)
-            self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
+            self.failUnlessEqual(len(self.t.buckets[0].nodes), 1)
+            self.failUnlessEqual(self.t.buckets[0].nodes[0], self.b)
             
         self.t.nodeFailed(self.b)
-        self.failUnlessEqual(len(self.t.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.t.buckets[0].nodes), 0)
index 5ed7e6d..6fec0d7 100644 (file)
@@ -72,7 +72,7 @@ class StatsLogger:
         """
         if datetime.now() - self.lastTableUpdate > timedelta(seconds = 15):
             self.lastTableUpdate = datetime.now()
-            self.nodes = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
+            self.nodes = reduce(lambda a, b: a + b.len(), self.table.buckets, 0)
             self.users = K * (2**(len(self.table.buckets) - 1))
         return (self.nodes, self.users)
     
index 966dc72..dba9d8b 100644 (file)
@@ -12,7 +12,7 @@ def bucket_stats(l):
     def count(buckets):
         c = 0
         for bucket in buckets:
-            c = c + len(bucket.l)
+            c = c + bucket.len()
         return c
     for node in l:
         c = count(node.table.buckets)