-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
-"""The routing table and buckets for a kademlia-like DHT."""
+"""The routing table and buckets for a kademlia-like DHT.
+
+@var K: the Kademlia "K" constant, this should be an even number
+"""
from datetime import datetime
from bisect import bisect_left
+from math import log as loge
from twisted.python import log
from twisted.trial import unittest
import khash
from node import Node, NULL_ID
+K = 8
+
class KTable:
"""Local routing table for a kademlia-like distributed hash table.
assert node.id != NULL_ID
self.node = node
self.config = config
- self.buckets = [KBucket([], 0L, 2L**self.config['HASH_LENGTH'])]
+ self.buckets = [KBucket([], 0L, 2L**(khash.HASH_LENGTH*8))]
def _bucketIndexForInt(self, num):
"""Find the index of the bucket that should hold the node's ID number."""
nodes = list(self.buckets[i].l)
# Make sure we have enough
- if len(nodes) < self.config['K']:
+ if len(nodes) < K:
# Look in adjoining buckets for nodes
min = i - 1
max = i + 1
- while len(nodes) < self.config['K'] and (min >= 0 or max < len(self.buckets)):
+ while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
# Add the adjoining buckets' nodes to the list
if min >= 0:
nodes = nodes + self.buckets[min].l
# Sort the found nodes by proximity to the id and return the closest K
nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
- return nodes[:self.config['K']]
+ return nodes[:K]
def _splitBucket(self, a):
"""Split a bucket in two.
a.l.remove(anode)
b.l.append(anode)
+ def _mergeBucket(self, i):
+ """Merge unneeded buckets after removing a node.
+
+ @type i: C{int}
+ @param i: the index of the bucket that lost a node
+ """
+ bucketRange = self.buckets[i].max - self.buckets[i].min
+ otherBucket = None
+
+ # Find if either of the neighbor buckets is the same size
+ # (this will only happen if this or the neighbour has our node ID in its range)
+ if i-1 >= 0 and self.buckets[i-1].max - self.buckets[i-1].min == bucketRange:
+ otherBucket = i-1
+ elif i+1 < len(self.buckets) and self.buckets[i+1].max - self.buckets[i+1].min == bucketRange:
+ otherBucket = i+1
+
+ # Decide if we should do a merge
+ if otherBucket is not None and len(self.buckets[i].l) + len(self.buckets[otherBucket].l) <= K:
+ # Remove one bucket and set the other to cover its range as well
+ b = self.buckets[i]
+ a = self.buckets.pop(otherBucket)
+ b.min = min(b.min, a.min)
+ b.max = max(b.max, a.max)
+
+ # Transfer the nodes to the bucket we're keeping, merging the sorting
+ bi = 0
+ for anode in a.l:
+ while bi < len(b.l) and b.l[bi].lastSeen <= anode.lastSeen:
+ bi += 1
+ b.l.insert(bi, anode)
+ bi += 1
+
+ # Recurse to check if the neighbour buckets can also be merged
+ self._mergeBucket(min(i, otherBucket))
+
def replaceStaleNode(self, stale, new = None):
"""Replace a stale node in a bucket with a new one.
not adding any node in the old node's place)
"""
# Find the stale node's bucket
+ removed = False
i = self._bucketIndexForInt(stale.num)
try:
it = self.buckets[i].l.index(stale.num)
except ValueError:
- return
-
- # Remove the stale node and insert the new one
- del(self.buckets[i].l[it])
- if new:
+ pass
+ else:
+ # Remove the stale node
+ del(self.buckets[i].l[it])
+ removed = True
+ log.msg('Removed node from routing table: %s/%s' % (stale.host, stale.port))
+
+ # Insert the new node
+ if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < K:
self.buckets[i].l.append(new)
+ elif removed:
+ self._mergeBucket(i)
def insertNode(self, node, contacted = True):
"""Try to insert a node in the routing table.
- This inserts the node, returning None if successful, otherwise returns
+ This inserts the node, returning True if successful, False if the
+ node could have been added if it responds to a ping, otherwise returns
the oldest node in the bucket if it's full. The caller is then
responsible for pinging the returned node and calling replaceStaleNode
if it doesn't respond. contacted means that yes, we contacted THEM and
@type contacted: C{boolean}
@param contacted: whether the new node is known to be good, i.e.
responded to a request (optional, defaults to True)
- @rtype: L{node.Node}
- @return: None if successful (the bucket wasn't full), otherwise returns the oldest node in the bucket
+ @rtype: L{node.Node} or C{boolean}
+ @return: True if successful (the bucket wasn't full), False if the
+ node could have been added if it was contacted, otherwise
+ returns the oldest node in the bucket
"""
assert node.id != NULL_ID
- if node.id == self.node.id: return
+ if node.id == self.node.id: return True
# Get the bucket for this node
i = self._bucketIndexForInt(node.num)
# utilizing this nodes new contact info
self.buckets[i].l.append(node)
self.buckets[i].touch()
- return
+ return True
# We don't have this node, check to see if the bucket is full
- if len(self.buckets[i].l) < self.config['K']:
+ if len(self.buckets[i].l) < K:
# Not full, append this node and return
if contacted:
node.updateLastSeen()
- self.buckets[i].l.append(node)
- self.buckets[i].touch()
- return
+ self.buckets[i].l.append(node)
+ self.buckets[i].touch()
+ log.msg('Added node to routing table: %s/%s' % (node.host, node.port))
+ return True
+ return False
# Bucket is full, check to see if the local node is not in the bucket
if not (self.buckets[i].min <= self.node < self.buckets[i].max):
return self.buckets[i].l[0]
# Make sure our table isn't FULL, this is really unlikely
- if len(self.buckets) >= self.config['HASH_LENGTH']:
- log.err("Hash Table is FULL! Increase K!")
+ if len(self.buckets) >= (khash.HASH_LENGTH*8):
+ log.err(RuntimeError("Hash Table is FULL! Increase K!"))
return
# This bucket is full and contains our node, split the bucket
n = self.buckets[i].l[it]
tstamp = n.lastSeen
n.updateLastSeen()
+
+ # Move the node to the end and touch the bucket
+ del(self.buckets[i].l[it])
+ self.buckets[i].l.append(n)
+ self.buckets[i].touch()
+
return tstamp
def invalidateNode(self, n):
"""Update the L{lastAccessed} time."""
self.lastAccessed = datetime.now()
+ def sort(self):
+ """Sort the nodes in the bucket by their lastSeen time."""
+ def _sort(a, b):
+ """Sort nodes by their lastSeen time."""
+ if a.lastSeen > b.lastSeen:
+ return 1
+ elif a.lastSeen < b.lastSeen:
+ return -1
+ return 0
+ self.l.sort(_sort)
+
def getNodeWithInt(self, num):
"""Get the node in the bucket with that number.
else: raise ValueError
def __repr__(self):
- return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max)
+ return "<KBucket %d items (%f to %f, range %d)>" % (
+ len(self.l), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2))
#{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long
def __lt__(self, a):
def setUp(self):
self.a = Node(khash.newID(), '127.0.0.1', 2002)
- self.t = KTable(self.a, {'HASH_LENGTH': 160, 'K': 8, 'MAX_FAILURES': 3})
+ self.t = KTable(self.a, {'MAX_FAILURES': 3})
def testAddNode(self):
self.b = Node(khash.newID(), '127.0.0.1', 2003)
self.t.invalidateNode(self.b)
self.failUnlessEqual(len(self.t.buckets[0].l), 0)
+ def testMergeBuckets(self):
+ for i in xrange(1000):
+ b = Node(khash.newID(), '127.0.0.1', 2003 + i)
+ self.t.insertNode(b)
+ num = len(self.t.buckets)
+ i = self.t._bucketIndexForInt(self.a.num)
+ for b in self.t.buckets[i].l[:]:
+ self.t.invalidateNode(b)
+ self.failUnlessEqual(len(self.t.buckets), num-1)
+
def testFail(self):
self.testAddNode()
for i in range(self.t.config['MAX_FAILURES'] - 1):