From 58467fc71f79725ccf83764f8859f0f82174d4cc Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Thu, 8 May 2008 14:04:57 -0700 Subject: [PATCH] Move the bucket manipulation into the KBucket class. --- apt_p2p_Khashmir/db.py | 4 +- apt_p2p_Khashmir/khashmir.py | 8 +- apt_p2p_Khashmir/ktable.py | 207 ++++++++++++++++++++--------------- apt_p2p_Khashmir/stats.py | 2 +- apt_p2p_Khashmir/util.py | 2 +- 5 files changed, 124 insertions(+), 99 deletions(-) diff --git a/apt_p2p_Khashmir/db.py b/apt_p2p_Khashmir/db.py index a796256..d0433e0 100644 --- a/apt_p2p_Khashmir/db.py +++ b/apt_p2p_Khashmir/db.py @@ -103,7 +103,7 @@ class DB: c = self.conn.cursor() c.execute("DELETE FROM nodes WHERE id NOT NULL") for bucket in buckets: - for node in bucket.l: + for node in bucket.nodes: c.execute("INSERT INTO nodes VALUES (?, ?, ?)", (khash(node.id), node.host, node.port)) self.conn.commit() @@ -209,7 +209,7 @@ class TestDB(unittest.TestCase): dummy2.port = 12345 class bl: def __init__(self): - self.l = [] + self.nodes = [] bl1 = bl() bl1.l.append(dummy()) bl2 = bl() diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index 74263a6..c89870e 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -582,10 +582,10 @@ class SimpleTests(unittest.TestCase): def testAddContact(self): self.failUnlessEqual(len(self.a.table.buckets), 1) - self.failUnlessEqual(len(self.a.table.buckets[0].l), 0) + self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0) self.failUnlessEqual(len(self.b.table.buckets), 1) - self.failUnlessEqual(len(self.b.table.buckets[0].l), 0) + self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0) self.a.addContact('127.0.0.1', 4045) reactor.iterate() @@ -594,9 +594,9 @@ class SimpleTests(unittest.TestCase): reactor.iterate() self.failUnlessEqual(len(self.a.table.buckets), 1) - self.failUnlessEqual(len(self.a.table.buckets[0].l), 1) + self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1) self.failUnlessEqual(len(self.b.table.buckets), 1) - self.failUnlessEqual(len(self.b.table.buckets[0].l), 1) + self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1) def testStoreRetrieve(self): self.a.addContact('127.0.0.1', 4045) diff --git a/apt_p2p_Khashmir/ktable.py b/apt_p2p_Khashmir/ktable.py index e91c0db..24ea93f 100644 --- a/apt_p2p_Khashmir/ktable.py +++ b/apt_p2p_Khashmir/ktable.py @@ -48,7 +48,7 @@ class KTable: def _nodeNum(self, id): """Takes different types of input and converts to the node ID number. - @type id: C{string} of C{int} or L{node.Node} + @type id: C{string} or C{int} or L{node.Node} @param id: the ID to find nodes that are close to @raise TypeError: if id does not properly identify an ID """ @@ -66,7 +66,7 @@ class KTable: def findNodes(self, id): """Find the K nodes in our own local table closest to the ID. - @type id: C{string} of C{int} or L{node.Node} + @type id: C{string} or C{int} or L{node.Node} @param id: the ID to find nodes that are close to """ @@ -75,7 +75,7 @@ class KTable: # Get the K closest nodes from the appropriate bucket i = self._bucketIndexForInt(num) - nodes = list(self.buckets[i].l) + nodes = self.buckets[i].list() # Make sure we have enough if len(nodes) < K: @@ -85,9 +85,9 @@ class KTable: while len(nodes) < K and (min >= 0 or max < len(self.buckets)): # Add the adjoining buckets' nodes to the list if min >= 0: - nodes = nodes + self.buckets[min].l + nodes = nodes + self.buckets[min].list() if max < len(self.buckets): - nodes = nodes + self.buckets[max].l + nodes = nodes + self.buckets[max].list() min = min - 1 max = max + 1 @@ -95,26 +95,6 @@ class KTable: nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num)) return nodes[:K] - def _splitBucket(self, a): - """Split a bucket in two. - - @type a: L{KBucket} - @param a: the bucket to split - """ - # Create a new bucket with half the (upper) range of the current bucket - diff = (a.max - a.min) / 2 - b = KBucket([], a.max - diff, a.max) - self.buckets.insert(self.buckets.index(a.min) + 1, b) - - # Reduce the input bucket's (upper) range - a.max = a.max - diff - - # Transfer nodes to the new bucket - for anode in a.l[:]: - if anode.num >= a.max: - a.l.remove(anode) - b.l.append(anode) - def _mergeBucket(self, i): """Merge unneeded buckets after removing a node. @@ -131,21 +111,10 @@ class KTable: elif i+1 < len(self.buckets) and self.buckets[i+1].max - self.buckets[i+1].min == bucketRange: otherBucket = i+1 - # Decide if we should do a merge - if otherBucket is not None and len(self.buckets[i].l) + len(self.buckets[otherBucket].l) <= K: - # Remove one bucket and set the other to cover its range as well - b = self.buckets[i] - a = self.buckets.pop(otherBucket) - b.min = min(b.min, a.min) - b.max = max(b.max, a.max) - - # Transfer the nodes to the bucket we're keeping, merging the sorting - bi = 0 - for anode in a.l: - while bi < len(b.l) and b.l[bi].lastSeen <= anode.lastSeen: - bi += 1 - b.l.insert(bi, anode) - bi += 1 + # Try and do a merge + if otherBucket is not None and self.buckets[i].merge(self.buckets[otherBucket]): + # Merge was successful, remove the old bucket + self.buckets.pop(otherBucket) # Recurse to check if the neighbour buckets can also be merged self._mergeBucket(min(i, otherBucket)) @@ -166,18 +135,17 @@ class KTable: removed = False i = self._bucketIndexForInt(stale.num) try: - it = self.buckets[i].l.index(stale.num) + self.buckets[i].remove(stale.num) except ValueError: pass else: - # Remove the stale node - del(self.buckets[i].l[it]) + # Removed the stale node removed = True log.msg('Removed node from routing table: %s/%s' % (stale.host, stale.port)) # Insert the new node - if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < K: - self.buckets[i].l.append(new) + if new and self._bucketIndexForInt(new.num) == i and self.buckets[i].len() < K: + self.buckets[i].add(new) elif removed: self._mergeBucket(i) @@ -209,7 +177,7 @@ class KTable: # Check to see if node is in the bucket already try: - it = self.buckets[i].l.index(node.num) + self.buckets[i].node(node.num) except ValueError: pass else: @@ -218,19 +186,19 @@ class KTable: # It responded, so update it node.updateLastSeen() # move node to end of bucket - del(self.buckets[i].l[it]) + self.buckets[i].remove(node.num) # note that we removed the original and replaced it with the new one # utilizing this nodes new contact info - self.buckets[i].l.append(node) + self.buckets[i].add(node) self.buckets[i].touch() return True # We don't have this node, check to see if the bucket is full - if len(self.buckets[i].l) < K: + if self.buckets[i].len() < K: # Not full, append this node and return if contacted: node.updateLastSeen() - self.buckets[i].l.append(node) + self.buckets[i].add(node) self.buckets[i].touch() log.msg('Added node to routing table: %s/%s' % (node.host, node.port)) return True @@ -239,7 +207,7 @@ class KTable: # Bucket is full, check to see if the local node is not in the bucket if not (self.buckets[i].min <= self.node < self.buckets[i].max): # Local node not in the bucket, can't split it, return the oldest node - return self.buckets[i].l[0] + return self.buckets[i].oldest() # Make sure our table isn't FULL, this is really unlikely if len(self.buckets) >= (khash.HASH_LENGTH*8): @@ -247,7 +215,8 @@ class KTable: return # This bucket is full and contains our node, split the bucket - self._splitBucket(self.buckets[i]) + newBucket = self.buckets[i].split() + self.buckets.insert(i + 1, newBucket) # Now that the bucket is split and balanced, try to insert the node again return self.insertNode(node) @@ -258,7 +227,7 @@ class KTable: Call this any time you get a message from a node, it will update it in the table if it's there. - @type id: C{string} of C{int} or L{node.Node} + @type id: C{string} or C{int} or L{node.Node} @param id: the node ID to mark as just having been seen @rtype: C{datetime.datetime} @return: the old lastSeen time of the node, or None if it's not in the table @@ -269,20 +238,11 @@ class KTable: # Check to see if node is in the bucket try: - it = self.buckets[i].l.index(num) + tstamp = self.buckets[i].justSeen(num) except ValueError: return None else: - # The node is in the bucket - n = self.buckets[i].l[it] - tstamp = n.lastSeen - n.updateLastSeen() - - # Move the node to the end and touch the bucket - del(self.buckets[i].l[it]) - self.buckets[i].l.append(n) self.buckets[i].touch() - return tstamp def invalidateNode(self, n): @@ -300,20 +260,19 @@ class KTable: # Check to see if node is in the bucket try: - it = self.buckets[i].l.index(num) + n = self.buckets[i].node(num) except ValueError: return None else: # The node is in the bucket - n = self.buckets[i].l[it] if n.msgFailed() >= self.config['MAX_FAILURES']: self.invalidateNode(n) class KBucket: """Single bucket of nodes in a kademlia-like routing table. - @type l: C{list} of L{node.Node} - @ivar l: the nodes that are in this bucket + @type nodes: C{list} of L{node.Node} + @ivar nodes: the nodes that are in this bucket @type min: C{long} @ivar min: the minimum node ID that can be in this bucket @type max: C{long} @@ -332,15 +291,29 @@ class KBucket: @type max: C{long} @param max: the maximum node ID that can be in this bucket """ - self.l = contents + self.nodes = contents self.min = min self.max = max self.lastAccessed = datetime.now() - def touch(self): - """Update the L{lastAccessed} time.""" - self.lastAccessed = datetime.now() + def __repr__(self): + return "" % ( + len(self.nodes), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2)) + #{ List-like functions + def len(self): return len(self.nodes) + def list(self): return list(self.nodes) + def node(self, num): return self.nodes[self.nodes.index(num)] + def remove(self, num): return self.nodes.pop(self.nodes.index(num)) + def oldest(self): return self.nodes[0] + + def add(self, node): + """Add the node in the correct sorted order.""" + i = len(self.nodes) + while i > 0 and node.lastSeen < self.nodes[i-1].lastSeen: + i -= 1 + self.nodes.insert(i, node) + def sort(self): """Sort the nodes in the bucket by their lastSeen time.""" def _sort(a, b): @@ -350,24 +323,76 @@ class KBucket: elif a.lastSeen < b.lastSeen: return -1 return 0 - self.l.sort(_sort) + self.nodes.sort(_sort) + + #{ Bucket functions + def touch(self): + """Update the L{lastAccessed} time.""" + self.lastAccessed = datetime.now() + + def justSeen(self, num): + """Mark a node as having been seen. + + @param num: the number of the node just seen + """ + i = self.nodes.index(num) + + # The node is in the bucket + n = self.nodes[i] + tstamp = n.lastSeen + n.updateLastSeen() + + # Move the node to the end and touch the bucket + self.nodes.pop(i) + self.nodes.append(n) + + return tstamp - def getNodeWithInt(self, num): - """Get the node in the bucket with that number. + def split(self): + """Split a bucket in two. - @type num: C{long} - @param num: the node ID to look for - @raise ValueError: if the node ID is not in the bucket - @rtype: L{node.Node} - @return: the node + @rtype: L{KBucket} + @return: the new bucket split from this one """ - if num in self.l: return num - else: raise ValueError + # Create a new bucket with half the (upper) range of the current bucket + diff = (self.max - self.min) / 2 + new = KBucket([], self.max - diff, self.max) - def __repr__(self): - return "" % ( - len(self.l), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2)) + # Reduce the input bucket's (upper) range + self.max = self.max - diff + + # Transfer nodes to the new bucket + for node in self.nodes[:]: + if node.num >= self.max: + self.nodes.remove(node) + new.add(node) + return new + def merge(self, old): + """Try to merge two buckets into one. + + @type old: L{KBucket} + @param old: the bucket to merge into this one + @return: whether a merge was done or not + """ + # Decide if we should do a merge + if len(self.nodes) + old.len() > K: + return False + + # Set the range to cover the other's as well + self.min = min(self.min, old.min) + self.max = max(self.max, old.max) + + # Transfer the other's nodes to this bucket, merging the sorting + i = 0 + for node in old.list(): + while i < len(self.nodes) and self.nodes[i].lastSeen <= node.lastSeen: + i += 1 + self.nodes.insert(i, node) + i += 1 + + return True + #{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long def __lt__(self, a): if isinstance(a, Node): a = a.num @@ -398,13 +423,13 @@ class TestKTable(unittest.TestCase): def testAddNode(self): self.b = Node(khash.newID(), '127.0.0.1', 2003) self.t.insertNode(self.b) - self.failUnlessEqual(len(self.t.buckets[0].l), 1) - self.failUnlessEqual(self.t.buckets[0].l[0], self.b) + self.failUnlessEqual(len(self.t.buckets[0].nodes), 1) + self.failUnlessEqual(self.t.buckets[0].nodes[0], self.b) def testRemove(self): self.testAddNode() self.t.invalidateNode(self.b) - self.failUnlessEqual(len(self.t.buckets[0].l), 0) + self.failUnlessEqual(len(self.t.buckets[0].nodes), 0) def testMergeBuckets(self): for i in xrange(1000): @@ -412,7 +437,7 @@ class TestKTable(unittest.TestCase): self.t.insertNode(b) num = len(self.t.buckets) i = self.t._bucketIndexForInt(self.a.num) - for b in self.t.buckets[i].l[:]: + for b in self.t.buckets[i].nodes[:]: self.t.invalidateNode(b) self.failUnlessEqual(len(self.t.buckets), num-1) @@ -420,8 +445,8 @@ class TestKTable(unittest.TestCase): self.testAddNode() for i in range(self.t.config['MAX_FAILURES'] - 1): self.t.nodeFailed(self.b) - self.failUnlessEqual(len(self.t.buckets[0].l), 1) - self.failUnlessEqual(self.t.buckets[0].l[0], self.b) + self.failUnlessEqual(len(self.t.buckets[0].nodes), 1) + self.failUnlessEqual(self.t.buckets[0].nodes[0], self.b) self.t.nodeFailed(self.b) - self.failUnlessEqual(len(self.t.buckets[0].l), 0) + self.failUnlessEqual(len(self.t.buckets[0].nodes), 0) diff --git a/apt_p2p_Khashmir/stats.py b/apt_p2p_Khashmir/stats.py index 5ed7e6d..6fec0d7 100644 --- a/apt_p2p_Khashmir/stats.py +++ b/apt_p2p_Khashmir/stats.py @@ -72,7 +72,7 @@ class StatsLogger: """ if datetime.now() - self.lastTableUpdate > timedelta(seconds = 15): self.lastTableUpdate = datetime.now() - self.nodes = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0) + self.nodes = reduce(lambda a, b: a + b.len(), self.table.buckets, 0) self.users = K * (2**(len(self.table.buckets) - 1)) return (self.nodes, self.users) diff --git a/apt_p2p_Khashmir/util.py b/apt_p2p_Khashmir/util.py index 966dc72..dba9d8b 100644 --- a/apt_p2p_Khashmir/util.py +++ b/apt_p2p_Khashmir/util.py @@ -12,7 +12,7 @@ def bucket_stats(l): def count(buckets): c = 0 for bucket in buckets: - c = c + len(bucket.l) + c = c + bucket.len() return c for node in l: c = count(node.table.buckets) -- 2.39.5