def _nodeNum(self, id):
"""Takes different types of input and converts to the node ID number.
- @type id: C{string} of C{int} or L{node.Node}
+ @type id: C{string} or C{int} or L{node.Node}
@param id: the ID to find nodes that are close to
@raise TypeError: if id does not properly identify an ID
"""
def findNodes(self, id):
"""Find the K nodes in our own local table closest to the ID.
- @type id: C{string} of C{int} or L{node.Node}
+ @type id: C{string} or C{int} or L{node.Node}
@param id: the ID to find nodes that are close to
"""
# Get the K closest nodes from the appropriate bucket
i = self._bucketIndexForInt(num)
- nodes = list(self.buckets[i].l)
+ nodes = self.buckets[i].list()
# Make sure we have enough
if len(nodes) < K:
while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
# Add the adjoining buckets' nodes to the list
if min >= 0:
- nodes = nodes + self.buckets[min].l
+ nodes = nodes + self.buckets[min].list()
if max < len(self.buckets):
- nodes = nodes + self.buckets[max].l
+ nodes = nodes + self.buckets[max].list()
min = min - 1
max = max + 1
nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
return nodes[:K]
- def _splitBucket(self, a):
- """Split a bucket in two.
-
- @type a: L{KBucket}
- @param a: the bucket to split
- """
- # Create a new bucket with half the (upper) range of the current bucket
- diff = (a.max - a.min) / 2
- b = KBucket([], a.max - diff, a.max)
- self.buckets.insert(self.buckets.index(a.min) + 1, b)
-
- # Reduce the input bucket's (upper) range
- a.max = a.max - diff
-
- # Transfer nodes to the new bucket
- for anode in a.l[:]:
- if anode.num >= a.max:
- a.l.remove(anode)
- b.l.append(anode)
-
def _mergeBucket(self, i):
"""Merge unneeded buckets after removing a node.
elif i+1 < len(self.buckets) and self.buckets[i+1].max - self.buckets[i+1].min == bucketRange:
otherBucket = i+1
- # Decide if we should do a merge
- if otherBucket is not None and len(self.buckets[i].l) + len(self.buckets[otherBucket].l) <= K:
- # Remove one bucket and set the other to cover its range as well
- b = self.buckets[i]
- a = self.buckets.pop(otherBucket)
- b.min = min(b.min, a.min)
- b.max = max(b.max, a.max)
-
- # Transfer the nodes to the bucket we're keeping, merging the sorting
- bi = 0
- for anode in a.l:
- while bi < len(b.l) and b.l[bi].lastSeen <= anode.lastSeen:
- bi += 1
- b.l.insert(bi, anode)
- bi += 1
+ # Try and do a merge
+ if otherBucket is not None and self.buckets[i].merge(self.buckets[otherBucket]):
+ # Merge was successful, remove the old bucket
+ self.buckets.pop(otherBucket)
# Recurse to check if the neighbour buckets can also be merged
self._mergeBucket(min(i, otherBucket))
removed = False
i = self._bucketIndexForInt(stale.num)
try:
- it = self.buckets[i].l.index(stale.num)
+ self.buckets[i].remove(stale.num)
except ValueError:
pass
else:
- # Remove the stale node
- del(self.buckets[i].l[it])
+ # Removed the stale node
removed = True
log.msg('Removed node from routing table: %s/%s' % (stale.host, stale.port))
# Insert the new node
- if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < K:
- self.buckets[i].l.append(new)
+ if new and self._bucketIndexForInt(new.num) == i and self.buckets[i].len() < K:
+ self.buckets[i].add(new)
elif removed:
self._mergeBucket(i)
# Check to see if node is in the bucket already
try:
- it = self.buckets[i].l.index(node.num)
+ self.buckets[i].node(node.num)
except ValueError:
pass
else:
# It responded, so update it
node.updateLastSeen()
# move node to end of bucket
- del(self.buckets[i].l[it])
+ self.buckets[i].remove(node.num)
# note that we removed the original and replaced it with the new one
# utilizing this nodes new contact info
- self.buckets[i].l.append(node)
+ self.buckets[i].add(node)
self.buckets[i].touch()
return True
# We don't have this node, check to see if the bucket is full
- if len(self.buckets[i].l) < K:
+ if self.buckets[i].len() < K:
# Not full, append this node and return
if contacted:
node.updateLastSeen()
- self.buckets[i].l.append(node)
+ self.buckets[i].add(node)
self.buckets[i].touch()
log.msg('Added node to routing table: %s/%s' % (node.host, node.port))
return True
# Bucket is full, check to see if the local node is not in the bucket
if not (self.buckets[i].min <= self.node < self.buckets[i].max):
# Local node not in the bucket, can't split it, return the oldest node
- return self.buckets[i].l[0]
+ return self.buckets[i].oldest()
# Make sure our table isn't FULL, this is really unlikely
if len(self.buckets) >= (khash.HASH_LENGTH*8):
return
# This bucket is full and contains our node, split the bucket
- self._splitBucket(self.buckets[i])
+ newBucket = self.buckets[i].split()
+ self.buckets.insert(i + 1, newBucket)
# Now that the bucket is split and balanced, try to insert the node again
return self.insertNode(node)
Call this any time you get a message from a node, it will update it
in the table if it's there.
- @type id: C{string} of C{int} or L{node.Node}
+ @type id: C{string} or C{int} or L{node.Node}
@param id: the node ID to mark as just having been seen
@rtype: C{datetime.datetime}
@return: the old lastSeen time of the node, or None if it's not in the table
# Check to see if node is in the bucket
try:
- it = self.buckets[i].l.index(num)
+ tstamp = self.buckets[i].justSeen(num)
except ValueError:
return None
else:
- # The node is in the bucket
- n = self.buckets[i].l[it]
- tstamp = n.lastSeen
- n.updateLastSeen()
-
- # Move the node to the end and touch the bucket
- del(self.buckets[i].l[it])
- self.buckets[i].l.append(n)
self.buckets[i].touch()
-
return tstamp
def invalidateNode(self, n):
# Check to see if node is in the bucket
try:
- it = self.buckets[i].l.index(num)
+ n = self.buckets[i].node(num)
except ValueError:
return None
else:
# The node is in the bucket
- n = self.buckets[i].l[it]
if n.msgFailed() >= self.config['MAX_FAILURES']:
self.invalidateNode(n)
class KBucket:
"""Single bucket of nodes in a kademlia-like routing table.
- @type l: C{list} of L{node.Node}
- @ivar l: the nodes that are in this bucket
+ @type nodes: C{list} of L{node.Node}
+ @ivar nodes: the nodes that are in this bucket
@type min: C{long}
@ivar min: the minimum node ID that can be in this bucket
@type max: C{long}
@type max: C{long}
@param max: the maximum node ID that can be in this bucket
"""
- self.l = contents
+ self.nodes = contents
self.min = min
self.max = max
self.lastAccessed = datetime.now()
- def touch(self):
- """Update the L{lastAccessed} time."""
- self.lastAccessed = datetime.now()
+ def __repr__(self):
+ return "<KBucket %d items (%f to %f, range %d)>" % (
+ len(self.nodes), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2))
+ #{ List-like functions
+ def len(self): return len(self.nodes)
+ def list(self): return list(self.nodes)
+ def node(self, num): return self.nodes[self.nodes.index(num)]
+ def remove(self, num): return self.nodes.pop(self.nodes.index(num))
+ def oldest(self): return self.nodes[0]
+
+ def add(self, node):
+ """Add the node in the correct sorted order."""
+ i = len(self.nodes)
+ while i > 0 and node.lastSeen < self.nodes[i-1].lastSeen:
+ i -= 1
+ self.nodes.insert(i, node)
+
def sort(self):
"""Sort the nodes in the bucket by their lastSeen time."""
def _sort(a, b):
elif a.lastSeen < b.lastSeen:
return -1
return 0
- self.l.sort(_sort)
+ self.nodes.sort(_sort)
+
+ #{ Bucket functions
+ def touch(self):
+ """Update the L{lastAccessed} time."""
+ self.lastAccessed = datetime.now()
+
+ def justSeen(self, num):
+ """Mark a node as having been seen.
+
+ @param num: the number of the node just seen
+ """
+ i = self.nodes.index(num)
+
+ # The node is in the bucket
+ n = self.nodes[i]
+ tstamp = n.lastSeen
+ n.updateLastSeen()
+
+ # Move the node to the end and touch the bucket
+ self.nodes.pop(i)
+ self.nodes.append(n)
+
+ return tstamp
- def getNodeWithInt(self, num):
- """Get the node in the bucket with that number.
+ def split(self):
+ """Split a bucket in two.
- @type num: C{long}
- @param num: the node ID to look for
- @raise ValueError: if the node ID is not in the bucket
- @rtype: L{node.Node}
- @return: the node
+ @rtype: L{KBucket}
+ @return: the new bucket split from this one
"""
- if num in self.l: return num
- else: raise ValueError
+ # Create a new bucket with half the (upper) range of the current bucket
+ diff = (self.max - self.min) / 2
+ new = KBucket([], self.max - diff, self.max)
- def __repr__(self):
- return "<KBucket %d items (%f to %f, range %d)>" % (
- len(self.l), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2))
+ # Reduce the input bucket's (upper) range
+ self.max = self.max - diff
+
+ # Transfer nodes to the new bucket
+ for node in self.nodes[:]:
+ if node.num >= self.max:
+ self.nodes.remove(node)
+ new.add(node)
+ return new
+ def merge(self, old):
+ """Try to merge two buckets into one.
+
+ @type old: L{KBucket}
+ @param old: the bucket to merge into this one
+ @return: whether a merge was done or not
+ """
+ # Decide if we should do a merge
+ if len(self.nodes) + old.len() > K:
+ return False
+
+ # Set the range to cover the other's as well
+ self.min = min(self.min, old.min)
+ self.max = max(self.max, old.max)
+
+ # Transfer the other's nodes to this bucket, merging the sorting
+ i = 0
+ for node in old.list():
+ while i < len(self.nodes) and self.nodes[i].lastSeen <= node.lastSeen:
+ i += 1
+ self.nodes.insert(i, node)
+ i += 1
+
+ return True
+
#{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long
def __lt__(self, a):
if isinstance(a, Node): a = a.num
def testAddNode(self):
self.b = Node(khash.newID(), '127.0.0.1', 2003)
self.t.insertNode(self.b)
- self.failUnlessEqual(len(self.t.buckets[0].l), 1)
- self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
+ self.failUnlessEqual(len(self.t.buckets[0].nodes), 1)
+ self.failUnlessEqual(self.t.buckets[0].nodes[0], self.b)
def testRemove(self):
self.testAddNode()
self.t.invalidateNode(self.b)
- self.failUnlessEqual(len(self.t.buckets[0].l), 0)
+ self.failUnlessEqual(len(self.t.buckets[0].nodes), 0)
def testMergeBuckets(self):
for i in xrange(1000):
self.t.insertNode(b)
num = len(self.t.buckets)
i = self.t._bucketIndexForInt(self.a.num)
- for b in self.t.buckets[i].l[:]:
+ for b in self.t.buckets[i].nodes[:]:
self.t.invalidateNode(b)
self.failUnlessEqual(len(self.t.buckets), num-1)
self.testAddNode()
for i in range(self.t.config['MAX_FAILURES'] - 1):
self.t.nodeFailed(self.b)
- self.failUnlessEqual(len(self.t.buckets[0].l), 1)
- self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
+ self.failUnlessEqual(len(self.t.buckets[0].nodes), 1)
+ self.failUnlessEqual(self.t.buckets[0].nodes[0], self.b)
self.t.nodeFailed(self.b)
- self.failUnlessEqual(len(self.t.buckets[0].l), 0)
+ self.failUnlessEqual(len(self.t.buckets[0].nodes), 0)