X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p_Khashmir%2Fktable.py;h=499c4d852e1e848fb93f30271728e0391a0cf256;hb=e848bee6cd8bd9ba754a21c3aff835f86d9c1aee;hp=3e60f5c37d0156e6f51c0f9d3543e689012424f4;hpb=b063e349bff01a77f21a8109741c4577dd5863a8;p=quix0rs-apt-p2p.git diff --git a/apt_p2p_Khashmir/ktable.py b/apt_p2p_Khashmir/ktable.py index 3e60f5c..499c4d8 100644 --- a/apt_p2p_Khashmir/ktable.py +++ b/apt_p2p_Khashmir/ktable.py @@ -1,10 +1,14 @@ ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information -"""The routing table and buckets for a kademlia-like DHT.""" +"""The routing table and buckets for a kademlia-like DHT. + +@var K: the Kademlia "K" constant, this should be an even number +""" from datetime import datetime from bisect import bisect_left +from math import log as loge from twisted.python import log from twisted.trial import unittest @@ -12,6 +16,8 @@ from twisted.trial import unittest import khash from node import Node, NULL_ID +K = 8 + class KTable: """Local routing table for a kademlia-like distributed hash table. @@ -35,7 +41,7 @@ class KTable: assert node.id != NULL_ID self.node = node self.config = config - self.buckets = [KBucket([], 0L, 2L**self.config['HASH_LENGTH'])] + self.buckets = [KBucket([], 0L, 2L**(khash.HASH_LENGTH*8))] def _bucketIndexForInt(self, num): """Find the index of the bucket that should hold the node's ID number.""" @@ -74,11 +80,11 @@ class KTable: nodes = list(self.buckets[i].l) # Make sure we have enough - if len(nodes) < self.config['K']: + if len(nodes) < K: # Look in adjoining buckets for nodes min = i - 1 max = i + 1 - while len(nodes) < self.config['K'] and (min >= 0 or max < len(self.buckets)): + while len(nodes) < K and (min >= 0 or max < len(self.buckets)): # Add the adjoining buckets' nodes to the list if min >= 0: nodes = nodes + self.buckets[min].l @@ -89,7 +95,7 @@ class KTable: # Sort the found nodes by proximity to the id and return the closest K nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num)) - return nodes[:self.config['K']] + return nodes[:K] def _splitBucket(self, a): """Split a bucket in two. @@ -111,6 +117,41 @@ class KTable: a.l.remove(anode) b.l.append(anode) + def _mergeBucket(self, i): + """Merge unneeded buckets after removing a node. + + @type i: C{int} + @param i: the index of the bucket that lost a node + """ + bucketRange = self.buckets[i].max - self.buckets[i].min + otherBucket = None + + # Find if either of the neighbor buckets is the same size + # (this will only happen if this or the neighbour has our node ID in its range) + if i-1 >= 0 and self.buckets[i-1].max - self.buckets[i-1].min == bucketRange: + otherBucket = i-1 + elif i+1 < len(self.buckets) and self.buckets[i+1].max - self.buckets[i+1].min == bucketRange: + otherBucket = i+1 + + # Decide if we should do a merge + if otherBucket is not None and len(self.buckets[i].l) + len(self.buckets[otherBucket].l) <= K: + # Remove one bucket and set the other to cover its range as well + b = self.buckets[i] + a = self.buckets.pop(otherBucket) + b.min = min(b.min, a.min) + b.max = max(b.max, a.max) + + # Transfer the nodes to the bucket we're keeping, merging the sorting + bi = 0 + for anode in a.l: + while bi < len(b.l) and b.l[bi].lastSeen <= anode.lastSeen: + bi += 1 + b.l.insert(bi, anode) + bi += 1 + + # Recurse to check if the neighbour buckets can also be merged + self._mergeBucket(min(i, otherBucket)) + def replaceStaleNode(self, stale, new = None): """Replace a stale node in a bucket with a new one. @@ -124,16 +165,22 @@ class KTable: not adding any node in the old node's place) """ # Find the stale node's bucket + removed = False i = self._bucketIndexForInt(stale.num) try: it = self.buckets[i].l.index(stale.num) except ValueError: - return - - # Remove the stale node and insert the new one - del(self.buckets[i].l[it]) - if new: + pass + else: + # Remove the stale node + del(self.buckets[i].l[it]) + removed = True + + # Insert the new node + if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < K: self.buckets[i].l.append(new) + elif removed: + self._mergeBucket(i) def insertNode(self, node, contacted = True): """Try to insert a node in the routing table. @@ -177,7 +224,7 @@ class KTable: return # We don't have this node, check to see if the bucket is full - if len(self.buckets[i].l) < self.config['K']: + if len(self.buckets[i].l) < K: # Not full, append this node and return if contacted: node.updateLastSeen() @@ -191,7 +238,7 @@ class KTable: return self.buckets[i].l[0] # Make sure our table isn't FULL, this is really unlikely - if len(self.buckets) >= self.config['HASH_LENGTH']: + if len(self.buckets) >= (khash.HASH_LENGTH*8): log.err("Hash Table is FULL! Increase K!") return @@ -226,6 +273,12 @@ class KTable: n = self.buckets[i].l[it] tstamp = n.lastSeen n.updateLastSeen() + + # Move the node to the end and touch the bucket + del(self.buckets[i].l[it]) + self.buckets[i].l.append(n) + self.buckets[i].touch() + return tstamp def invalidateNode(self, n): @@ -284,6 +337,17 @@ class KBucket: """Update the L{lastAccessed} time.""" self.lastAccessed = datetime.now() + def sort(self): + """Sort the nodes in the bucket by their lastSeen time.""" + def _sort(a, b): + """Sort nodes by their lastSeen time.""" + if a.lastSeen > b.lastSeen: + return 1 + elif a.lastSeen < b.lastSeen: + return -1 + return 0 + self.l.sort(_sort) + def getNodeWithInt(self, num): """Get the node in the bucket with that number. @@ -297,7 +361,8 @@ class KBucket: else: raise ValueError def __repr__(self): - return "" % (len(self.l), self.min, self.max) + return "" % ( + len(self.l), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2)) #{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long def __lt__(self, a): @@ -324,7 +389,7 @@ class TestKTable(unittest.TestCase): def setUp(self): self.a = Node(khash.newID(), '127.0.0.1', 2002) - self.t = KTable(self.a, {'HASH_LENGTH': 160, 'K': 8, 'MAX_FAILURES': 3}) + self.t = KTable(self.a, {'MAX_FAILURES': 3}) def testAddNode(self): self.b = Node(khash.newID(), '127.0.0.1', 2003) @@ -337,6 +402,16 @@ class TestKTable(unittest.TestCase): self.t.invalidateNode(self.b) self.failUnlessEqual(len(self.t.buckets[0].l), 0) + def testMergeBuckets(self): + for i in xrange(1000): + b = Node(khash.newID(), '127.0.0.1', 2003 + i) + self.t.insertNode(b) + num = len(self.t.buckets) + i = self.t._bucketIndexForInt(self.a.num) + for b in self.t.buckets[i].l[:]: + self.t.invalidateNode(b) + self.failUnlessEqual(len(self.t.buckets), num-1) + def testFail(self): self.testAddNode() for i in range(self.t.config['MAX_FAILURES'] - 1):