X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;ds=sidebyside;f=apt_p2p_Khashmir%2Fktable.py;h=e91c0db9843cf250fb5ab46bc0226b21150be314;hb=194d18a4f4f1c615eecc7d93f379497e0eb586cd;hp=3e60f5c37d0156e6f51c0f9d3543e689012424f4;hpb=b063e349bff01a77f21a8109741c4577dd5863a8;p=quix0rs-apt-p2p.git diff --git a/apt_p2p_Khashmir/ktable.py b/apt_p2p_Khashmir/ktable.py index 3e60f5c..e91c0db 100644 --- a/apt_p2p_Khashmir/ktable.py +++ b/apt_p2p_Khashmir/ktable.py @@ -1,10 +1,12 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information -"""The routing table and buckets for a kademlia-like DHT.""" +"""The routing table and buckets for a kademlia-like DHT. + +@var K: the Kademlia "K" constant, this should be an even number +""" from datetime import datetime from bisect import bisect_left +from math import log as loge from twisted.python import log from twisted.trial import unittest @@ -12,6 +14,8 @@ from twisted.trial import unittest import khash from node import Node, NULL_ID +K = 8 + class KTable: """Local routing table for a kademlia-like distributed hash table. @@ -35,7 +39,7 @@ class KTable: assert node.id != NULL_ID self.node = node self.config = config - self.buckets = [KBucket([], 0L, 2L**self.config['HASH_LENGTH'])] + self.buckets = [KBucket([], 0L, 2L**(khash.HASH_LENGTH*8))] def _bucketIndexForInt(self, num): """Find the index of the bucket that should hold the node's ID number.""" @@ -74,11 +78,11 @@ class KTable: nodes = list(self.buckets[i].l) # Make sure we have enough - if len(nodes) < self.config['K']: + if len(nodes) < K: # Look in adjoining buckets for nodes min = i - 1 max = i + 1 - while len(nodes) < self.config['K'] and (min >= 0 or max < len(self.buckets)): + while len(nodes) < K and (min >= 0 or max < len(self.buckets)): # Add the adjoining buckets' nodes to the list if min >= 0: nodes = nodes + self.buckets[min].l @@ -89,7 +93,7 @@ class KTable: # Sort the found nodes by proximity to the id and return the closest K nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num)) - return nodes[:self.config['K']] + return nodes[:K] def _splitBucket(self, a): """Split a bucket in two. @@ -111,6 +115,41 @@ class KTable: a.l.remove(anode) b.l.append(anode) + def _mergeBucket(self, i): + """Merge unneeded buckets after removing a node. + + @type i: C{int} + @param i: the index of the bucket that lost a node + """ + bucketRange = self.buckets[i].max - self.buckets[i].min + otherBucket = None + + # Find if either of the neighbor buckets is the same size + # (this will only happen if this or the neighbour has our node ID in its range) + if i-1 >= 0 and self.buckets[i-1].max - self.buckets[i-1].min == bucketRange: + otherBucket = i-1 + elif i+1 < len(self.buckets) and self.buckets[i+1].max - self.buckets[i+1].min == bucketRange: + otherBucket = i+1 + + # Decide if we should do a merge + if otherBucket is not None and len(self.buckets[i].l) + len(self.buckets[otherBucket].l) <= K: + # Remove one bucket and set the other to cover its range as well + b = self.buckets[i] + a = self.buckets.pop(otherBucket) + b.min = min(b.min, a.min) + b.max = max(b.max, a.max) + + # Transfer the nodes to the bucket we're keeping, merging the sorting + bi = 0 + for anode in a.l: + while bi < len(b.l) and b.l[bi].lastSeen <= anode.lastSeen: + bi += 1 + b.l.insert(bi, anode) + bi += 1 + + # Recurse to check if the neighbour buckets can also be merged + self._mergeBucket(min(i, otherBucket)) + def replaceStaleNode(self, stale, new = None): """Replace a stale node in a bucket with a new one. @@ -124,21 +163,29 @@ class KTable: not adding any node in the old node's place) """ # Find the stale node's bucket + removed = False i = self._bucketIndexForInt(stale.num) try: it = self.buckets[i].l.index(stale.num) except ValueError: - return - - # Remove the stale node and insert the new one - del(self.buckets[i].l[it]) - if new: + pass + else: + # Remove the stale node + del(self.buckets[i].l[it]) + removed = True + log.msg('Removed node from routing table: %s/%s' % (stale.host, stale.port)) + + # Insert the new node + if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < K: self.buckets[i].l.append(new) + elif removed: + self._mergeBucket(i) def insertNode(self, node, contacted = True): """Try to insert a node in the routing table. - This inserts the node, returning None if successful, otherwise returns + This inserts the node, returning True if successful, False if the + node could have been added if it responds to a ping, otherwise returns the oldest node in the bucket if it's full. The caller is then responsible for pinging the returned node and calling replaceStaleNode if it doesn't respond. contacted means that yes, we contacted THEM and @@ -149,11 +196,13 @@ class KTable: @type contacted: C{boolean} @param contacted: whether the new node is known to be good, i.e. responded to a request (optional, defaults to True) - @rtype: L{node.Node} - @return: None if successful (the bucket wasn't full), otherwise returns the oldest node in the bucket + @rtype: L{node.Node} or C{boolean} + @return: True if successful (the bucket wasn't full), False if the + node could have been added if it was contacted, otherwise + returns the oldest node in the bucket """ assert node.id != NULL_ID - if node.id == self.node.id: return + if node.id == self.node.id: return True # Get the bucket for this node i = self._bucketIndexForInt(node.num) @@ -174,16 +223,18 @@ class KTable: # utilizing this nodes new contact info self.buckets[i].l.append(node) self.buckets[i].touch() - return + return True # We don't have this node, check to see if the bucket is full - if len(self.buckets[i].l) < self.config['K']: + if len(self.buckets[i].l) < K: # Not full, append this node and return if contacted: node.updateLastSeen() - self.buckets[i].l.append(node) - self.buckets[i].touch() - return + self.buckets[i].l.append(node) + self.buckets[i].touch() + log.msg('Added node to routing table: %s/%s' % (node.host, node.port)) + return True + return False # Bucket is full, check to see if the local node is not in the bucket if not (self.buckets[i].min <= self.node < self.buckets[i].max): @@ -191,8 +242,8 @@ class KTable: return self.buckets[i].l[0] # Make sure our table isn't FULL, this is really unlikely - if len(self.buckets) >= self.config['HASH_LENGTH']: - log.err("Hash Table is FULL! Increase K!") + if len(self.buckets) >= (khash.HASH_LENGTH*8): + log.err(RuntimeError("Hash Table is FULL! Increase K!")) return # This bucket is full and contains our node, split the bucket @@ -226,6 +277,12 @@ class KTable: n = self.buckets[i].l[it] tstamp = n.lastSeen n.updateLastSeen() + + # Move the node to the end and touch the bucket + del(self.buckets[i].l[it]) + self.buckets[i].l.append(n) + self.buckets[i].touch() + return tstamp def invalidateNode(self, n): @@ -284,6 +341,17 @@ class KBucket: """Update the L{lastAccessed} time.""" self.lastAccessed = datetime.now() + def sort(self): + """Sort the nodes in the bucket by their lastSeen time.""" + def _sort(a, b): + """Sort nodes by their lastSeen time.""" + if a.lastSeen > b.lastSeen: + return 1 + elif a.lastSeen < b.lastSeen: + return -1 + return 0 + self.l.sort(_sort) + def getNodeWithInt(self, num): """Get the node in the bucket with that number. @@ -297,7 +365,8 @@ class KBucket: else: raise ValueError def __repr__(self): - return "" % (len(self.l), self.min, self.max) + return "" % ( + len(self.l), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2)) #{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long def __lt__(self, a): @@ -324,7 +393,7 @@ class TestKTable(unittest.TestCase): def setUp(self): self.a = Node(khash.newID(), '127.0.0.1', 2002) - self.t = KTable(self.a, {'HASH_LENGTH': 160, 'K': 8, 'MAX_FAILURES': 3}) + self.t = KTable(self.a, {'MAX_FAILURES': 3}) def testAddNode(self): self.b = Node(khash.newID(), '127.0.0.1', 2003) @@ -337,6 +406,16 @@ class TestKTable(unittest.TestCase): self.t.invalidateNode(self.b) self.failUnlessEqual(len(self.t.buckets[0].l), 0) + def testMergeBuckets(self): + for i in xrange(1000): + b = Node(khash.newID(), '127.0.0.1', 2003 + i) + self.t.insertNode(b) + num = len(self.t.buckets) + i = self.t._bucketIndexForInt(self.a.num) + for b in self.t.buckets[i].l[:]: + self.t.invalidateNode(b) + self.failUnlessEqual(len(self.t.buckets), num-1) + def testFail(self): self.testAddNode() for i in range(self.t.config['MAX_FAILURES'] - 1):