]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/ktable.py
Final version of abstract.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / ktable.py
index 499c4d852e1e848fb93f30271728e0391a0cf256..201d494d927392bca58157857a11a5154e39daa2 100644 (file)
@@ -1,5 +1,3 @@
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
 
 """The routing table and buckets for a kademlia-like DHT.
 
@@ -50,7 +48,7 @@ class KTable:
     def _nodeNum(self, id):
         """Takes different types of input and converts to the node ID number.
 
-        @type id: C{string} of C{int} or L{node.Node}
+        @type id: C{string} or C{int} or L{node.Node}
         @param id: the ID to find nodes that are close to
         @raise TypeError: if id does not properly identify an ID
         """
@@ -68,7 +66,7 @@ class KTable:
     def findNodes(self, id):
         """Find the K nodes in our own local table closest to the ID.
 
-        @type id: C{string} of C{int} or L{node.Node}
+        @type id: C{string} or C{int} or L{node.Node}
         @param id: the ID to find nodes that are close to
         """
 
@@ -77,7 +75,7 @@ class KTable:
             
         # Get the K closest nodes from the appropriate bucket
         i = self._bucketIndexForInt(num)
-        nodes = list(self.buckets[i].l)
+        nodes = self.buckets[i].list()
         
         # Make sure we have enough
         if len(nodes) < K:
@@ -87,9 +85,9 @@ class KTable:
             while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
                 # Add the adjoining buckets' nodes to the list
                 if min >= 0:
-                    nodes = nodes + self.buckets[min].l
+                    nodes = nodes + self.buckets[min].list()
                 if max < len(self.buckets):
-                    nodes = nodes + self.buckets[max].l
+                    nodes = nodes + self.buckets[max].list()
                 min = min - 1
                 max = max + 1
     
@@ -97,26 +95,18 @@ class KTable:
         nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
         return nodes[:K]
         
-    def _splitBucket(self, a):
-        """Split a bucket in two.
-        
-        @type a: L{KBucket}
-        @param a: the bucket to split
+    def touch(self, id):
+        """Mark a bucket as having been looked up.
+
+        @type id: C{string} or C{int} or L{node.Node}
+        @param id: the ID in the bucket that was accessed
         """
-        # Create a new bucket with half the (upper) range of the current bucket
-        diff = (a.max - a.min) / 2
-        b = KBucket([], a.max - diff, a.max)
-        self.buckets.insert(self.buckets.index(a.min) + 1, b)
+        # Get the bucket number from the input
+        num = self._nodeNum(id)
+        i = self._bucketIndexForInt(num)
         
-        # Reduce the input bucket's (upper) range 
-        a.max = a.max - diff
+        self.buckets[i].touch()
 
-        # Transfer nodes to the new bucket
-        for anode in a.l[:]:
-            if anode.num >= a.max:
-                a.l.remove(anode)
-                b.l.append(anode)
-    
     def _mergeBucket(self, i):
         """Merge unneeded buckets after removing a node.
         
@@ -133,21 +123,10 @@ class KTable:
         elif i+1 < len(self.buckets) and self.buckets[i+1].max - self.buckets[i+1].min == bucketRange:
             otherBucket = i+1
             
-        # Decide if we should do a merge
-        if otherBucket is not None and len(self.buckets[i].l) + len(self.buckets[otherBucket].l) <= K:
-            # Remove one bucket and set the other to cover its range as well
-            b = self.buckets[i]
-            a = self.buckets.pop(otherBucket)
-            b.min = min(b.min, a.min)
-            b.max = max(b.max, a.max)
-
-            # Transfer the nodes to the bucket we're keeping, merging the sorting
-            bi = 0
-            for anode in a.l:
-                while bi < len(b.l) and b.l[bi].lastSeen <= anode.lastSeen:
-                    bi += 1
-                b.l.insert(bi, anode)
-                bi += 1
+        # Try and do a merge
+        if otherBucket is not None and self.buckets[i].merge(self.buckets[otherBucket]):
+            # Merge was successful, remove the old bucket
+            self.buckets.pop(otherBucket)
                 
             # Recurse to check if the neighbour buckets can also be merged
             self._mergeBucket(min(i, otherBucket))
@@ -168,24 +147,25 @@ class KTable:
         removed = False
         i = self._bucketIndexForInt(stale.num)
         try:
-            it = self.buckets[i].l.index(stale.num)
+            self.buckets[i].remove(stale.num)
         except ValueError:
             pass
         else:
-            # Remove the stale node
-            del(self.buckets[i].l[it])
+            # Removed the stale node
             removed = True
+            log.msg('Removed node from routing table: %s/%s' % (stale.host, stale.port))
         
         # Insert the new node
-        if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < K:
-            self.buckets[i].l.append(new)
+        if new and self._bucketIndexForInt(new.num) == i and self.buckets[i].len() < K:
+            self.buckets[i].add(new)
         elif removed:
             self._mergeBucket(i)
     
     def insertNode(self, node, contacted = True):
         """Try to insert a node in the routing table.
         
-        This inserts the node, returning None if successful, otherwise returns
+        This inserts the node, returning True if successful, False if the
+        node could have been added if it responds to a ping, otherwise returns
         the oldest node in the bucket if it's full. The caller is then
         responsible for pinging the returned node and calling replaceStaleNode
         if it doesn't respond. contacted means that yes, we contacted THEM and
@@ -196,18 +176,20 @@ class KTable:
         @type contacted: C{boolean}
         @param contacted: whether the new node is known to be good, i.e.
             responded to a request (optional, defaults to True)
-        @rtype: L{node.Node}
-        @return: None if successful (the bucket wasn't full), otherwise returns the oldest node in the bucket
+        @rtype: L{node.Node} or C{boolean}
+        @return: True if successful (the bucket wasn't full), False if the
+            node could have been added if it was contacted, otherwise
+            returns the oldest node in the bucket
         """
         assert node.id != NULL_ID
-        if node.id == self.node.id: return
+        if node.id == self.node.id: return True
 
         # Get the bucket for this node
         i = self._bucketIndexForInt(node.num)
 
         # Check to see if node is in the bucket already
         try:
-            it = self.buckets[i].l.index(node.num)
+            self.buckets[i].node(node.num)
         except ValueError:
             pass
         else:
@@ -216,34 +198,35 @@ class KTable:
                 # It responded, so update it
                 node.updateLastSeen()
                 # move node to end of bucket
-                del(self.buckets[i].l[it])
+                self.buckets[i].remove(node.num)
                 # note that we removed the original and replaced it with the new one
                 # utilizing this nodes new contact info
-                self.buckets[i].l.append(node)
-                self.buckets[i].touch()
-            return
+                self.buckets[i].add(node)
+            return True
         
         # We don't have this node, check to see if the bucket is full
-        if len(self.buckets[i].l) < K:
+        if self.buckets[i].len() < K:
             # Not full, append this node and return
             if contacted:
                 node.updateLastSeen()
-            self.buckets[i].l.append(node)
-            self.buckets[i].touch()
-            return
+                self.buckets[i].add(node)
+                log.msg('Added node to routing table: %s/%s' % (node.host, node.port))
+                return True
+            return False
             
         # Bucket is full, check to see if the local node is not in the bucket
         if not (self.buckets[i].min <= self.node < self.buckets[i].max):
             # Local node not in the bucket, can't split it, return the oldest node
-            return self.buckets[i].l[0]
+            return self.buckets[i].oldest()
         
         # Make sure our table isn't FULL, this is really unlikely
         if len(self.buckets) >= (khash.HASH_LENGTH*8):
-            log.err("Hash Table is FULL!  Increase K!")
+            log.err(RuntimeError("Hash Table is FULL! Increase K!"))
             return
             
         # This bucket is full and contains our node, split the bucket
-        self._splitBucket(self.buckets[i])
+        newBucket = self.buckets[i].split()
+        self.buckets.insert(i + 1, newBucket)
         
         # Now that the bucket is split and balanced, try to insert the node again
         return self.insertNode(node)
@@ -254,7 +237,7 @@ class KTable:
         Call this any time you get a message from a node, it will update it
         in the table if it's there.
 
-        @type id: C{string} of C{int} or L{node.Node}
+        @type id: C{string} or C{int} or L{node.Node}
         @param id: the node ID to mark as just having been seen
         @rtype: C{datetime.datetime}
         @return: the old lastSeen time of the node, or None if it's not in the table
@@ -265,20 +248,10 @@ class KTable:
 
         # Check to see if node is in the bucket
         try:
-            it = self.buckets[i].l.index(num)
+            tstamp = self.buckets[i].justSeen(num)
         except ValueError:
             return None
         else:
-            # The node is in the bucket
-            n = self.buckets[i].l[it]
-            tstamp = n.lastSeen
-            n.updateLastSeen()
-            
-            # Move the node to the end and touch the bucket
-            del(self.buckets[i].l[it])
-            self.buckets[i].l.append(n)
-            self.buckets[i].touch()
-            
             return tstamp
     
     def invalidateNode(self, n):
@@ -289,27 +262,31 @@ class KTable:
         self.replaceStaleNode(n)
     
     def nodeFailed(self, node):
-        """Mark a node as having failed once, and remove it if it has failed too much."""
+        """Mark a node as having failed once, and remove it if it has failed too much.
+        
+        @return: whether the node is in the routing table
+        """
         # Get the bucket number
         num = self._nodeNum(node)
         i = self._bucketIndexForInt(num)
 
         # Check to see if node is in the bucket
         try:
-            it = self.buckets[i].l.index(num)
+            n = self.buckets[i].node(num)
         except ValueError:
-            return None
+            return False
         else:
             # The node is in the bucket
-            n = self.buckets[i].l[it]
             if n.msgFailed() >= self.config['MAX_FAILURES']:
                 self.invalidateNode(n)
+                return False
+            return True
                         
 class KBucket:
     """Single bucket of nodes in a kademlia-like routing table.
     
-    @type l: C{list} of L{node.Node}
-    @ivar l: the nodes that are in this bucket
+    @type nodes: C{list} of L{node.Node}
+    @ivar nodes: the nodes that are in this bucket
     @type min: C{long}
     @ivar min: the minimum node ID that can be in this bucket
     @type max: C{long}
@@ -328,15 +305,29 @@ class KBucket:
         @type max: C{long}
         @param max: the maximum node ID that can be in this bucket
         """
-        self.l = contents
+        self.nodes = contents
         self.min = min
         self.max = max
         self.lastAccessed = datetime.now()
         
-    def touch(self):
-        """Update the L{lastAccessed} time."""
-        self.lastAccessed = datetime.now()
+    def __repr__(self):
+        return "<KBucket %d items (%f to %f, range %d)>" % (
+                len(self.nodes), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2))
     
+    #{ List-like functions
+    def len(self): return len(self.nodes)
+    def list(self): return list(self.nodes)
+    def node(self, num): return self.nodes[self.nodes.index(num)]
+    def remove(self, num): return self.nodes.pop(self.nodes.index(num))
+    def oldest(self): return self.nodes[0]
+
+    def add(self, node):
+        """Add the node in the correct sorted order."""
+        i = len(self.nodes)
+        while i > 0 and node.lastSeen < self.nodes[i-1].lastSeen:
+            i -= 1
+        self.nodes.insert(i, node)
+        
     def sort(self):
         """Sort the nodes in the bucket by their lastSeen time."""
         def _sort(a, b):
@@ -346,24 +337,76 @@ class KBucket:
             elif a.lastSeen < b.lastSeen:
                 return -1
             return 0
-        self.l.sort(_sort)
+        self.nodes.sort(_sort)
+        
+    #{ Bucket functions
+    def touch(self):
+        """Update the L{lastAccessed} time."""
+        self.lastAccessed = datetime.now()
+    
+    def justSeen(self, num):
+        """Mark a node as having been seen.
+        
+        @param num: the number of the node just seen
+        """
+        i = self.nodes.index(num)
+        
+        # The node is in the bucket
+        n = self.nodes[i]
+        tstamp = n.lastSeen
+        n.updateLastSeen()
+        
+        # Move the node to the end and touch the bucket
+        self.nodes.pop(i)
+        self.nodes.append(n)
+        
+        return tstamp
 
-    def getNodeWithInt(self, num):
-        """Get the node in the bucket with that number.
+    def split(self):
+        """Split a bucket in two.
         
-        @type num: C{long}
-        @param num: the node ID to look for
-        @raise ValueError: if the node ID is not in the bucket
-        @rtype: L{node.Node}
-        @return: the node
+        @rtype: L{KBucket}
+        @return: the new bucket split from this one
         """
-        if num in self.l: return num
-        else: raise ValueError
+        # Create a new bucket with half the (upper) range of the current bucket
+        diff = (self.max - self.min) / 2
+        new = KBucket([], self.max - diff, self.max)
         
-    def __repr__(self):
-        return "<KBucket %d items (%f to %f, range %d)>" % (
-                len(self.l), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2))
+        # Reduce the input bucket's (upper) range 
+        self.max = self.max - diff
+
+        # Transfer nodes to the new bucket
+        for node in self.nodes[:]:
+            if node.num >= self.max:
+                self.nodes.remove(node)
+                new.add(node)
+        return new
     
+    def merge(self, old):
+        """Try to merge two buckets into one.
+        
+        @type old: L{KBucket}
+        @param old: the bucket to merge into this one
+        @return: whether a merge was done or not
+        """
+        # Decide if we should do a merge
+        if len(self.nodes) + old.len() > K:
+            return False
+
+        # Set the range to cover the other's as well
+        self.min = min(self.min, old.min)
+        self.max = max(self.max, old.max)
+
+        # Transfer the other's nodes to this bucket, merging the sorting
+        i = 0
+        for node in old.list():
+            while i < len(self.nodes) and self.nodes[i].lastSeen <= node.lastSeen:
+                i += 1
+            self.nodes.insert(i, node)
+            i += 1
+
+        return True
+                
     #{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long
     def __lt__(self, a):
         if isinstance(a, Node): a = a.num
@@ -394,13 +437,13 @@ class TestKTable(unittest.TestCase):
     def testAddNode(self):
         self.b = Node(khash.newID(), '127.0.0.1', 2003)
         self.t.insertNode(self.b)
-        self.failUnlessEqual(len(self.t.buckets[0].l), 1)
-        self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
+        self.failUnlessEqual(len(self.t.buckets[0].nodes), 1)
+        self.failUnlessEqual(self.t.buckets[0].nodes[0], self.b)
 
     def testRemove(self):
         self.testAddNode()
         self.t.invalidateNode(self.b)
-        self.failUnlessEqual(len(self.t.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.t.buckets[0].nodes), 0)
 
     def testMergeBuckets(self):
         for i in xrange(1000):
@@ -408,7 +451,7 @@ class TestKTable(unittest.TestCase):
             self.t.insertNode(b)
         num = len(self.t.buckets)
         i = self.t._bucketIndexForInt(self.a.num)
-        for b in self.t.buckets[i].l[:]:
+        for b in self.t.buckets[i].nodes[:]:
             self.t.invalidateNode(b)
         self.failUnlessEqual(len(self.t.buckets), num-1)
 
@@ -416,8 +459,8 @@ class TestKTable(unittest.TestCase):
         self.testAddNode()
         for i in range(self.t.config['MAX_FAILURES'] - 1):
             self.t.nodeFailed(self.b)
-            self.failUnlessEqual(len(self.t.buckets[0].l), 1)
-            self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
+            self.failUnlessEqual(len(self.t.buckets[0].nodes), 1)
+            self.failUnlessEqual(self.t.buckets[0].nodes[0], self.b)
             
         self.t.nodeFailed(self.b)
-        self.failUnlessEqual(len(self.t.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.t.buckets[0].nodes), 0)