def _nodeNum(self, id):
"""Takes different types of input and converts to the node ID number.
- @type id: C{string} of C{int} or L{node.Node}
+ @type id: C{string} or C{int} or L{node.Node}
@param id: the ID to find nodes that are close to
@raise TypeError: if id does not properly identify an ID
"""
def findNodes(self, id):
"""Find the K nodes in our own local table closest to the ID.
- @type id: C{string} of C{int} or L{node.Node}
+ @type id: C{string} or C{int} or L{node.Node}
@param id: the ID to find nodes that are close to
"""
# Get the K closest nodes from the appropriate bucket
i = self._bucketIndexForInt(num)
- nodes = list(self.buckets[i].l)
+ nodes = self.buckets[i].list()
# Make sure we have enough
if len(nodes) < K:
while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
# Add the adjoining buckets' nodes to the list
if min >= 0:
- nodes = nodes + self.buckets[min].l
+ nodes = nodes + self.buckets[min].list()
if max < len(self.buckets):
- nodes = nodes + self.buckets[max].l
+ nodes = nodes + self.buckets[max].list()
min = min - 1
max = max + 1
nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
return nodes[:K]
- def _splitBucket(self, a):
- """Split a bucket in two.
-
- @type a: L{KBucket}
- @param a: the bucket to split
+ def touch(self, id):
+ """Mark a bucket as having been looked up.
+
+ @type id: C{string} or C{int} or L{node.Node}
+ @param id: the ID in the bucket that was accessed
"""
- # Create a new bucket with half the (upper) range of the current bucket
- diff = (a.max - a.min) / 2
- b = KBucket([], a.max - diff, a.max)
- self.buckets.insert(self.buckets.index(a.min) + 1, b)
+ # Get the bucket number from the input
+ num = self._nodeNum(id)
+ i = self._bucketIndexForInt(num)
- # Reduce the input bucket's (upper) range
- a.max = a.max - diff
+ self.buckets[i].touch()
- # Transfer nodes to the new bucket
- for anode in a.l[:]:
- if anode.num >= a.max:
- a.l.remove(anode)
- b.l.append(anode)
-
def _mergeBucket(self, i):
"""Merge unneeded buckets after removing a node.
elif i+1 < len(self.buckets) and self.buckets[i+1].max - self.buckets[i+1].min == bucketRange:
otherBucket = i+1
- # Decide if we should do a merge
- if otherBucket is not None and len(self.buckets[i].l) + len(self.buckets[otherBucket].l) <= K:
- # Remove one bucket and set the other to cover its range as well
- b = self.buckets[i]
- a = self.buckets.pop(otherBucket)
- b.min = min(b.min, a.min)
- b.max = max(b.max, a.max)
-
- # Transfer the nodes to the bucket we're keeping, merging the sorting
- bi = 0
- for anode in a.l:
- while bi < len(b.l) and b.l[bi].lastSeen <= anode.lastSeen:
- bi += 1
- b.l.insert(bi, anode)
- bi += 1
+ # Try and do a merge
+ if otherBucket is not None and self.buckets[i].merge(self.buckets[otherBucket]):
+ # Merge was successful, remove the old bucket
+ self.buckets.pop(otherBucket)
# Recurse to check if the neighbour buckets can also be merged
self._mergeBucket(min(i, otherBucket))
removed = False
i = self._bucketIndexForInt(stale.num)
try:
- it = self.buckets[i].l.index(stale.num)
+ self.buckets[i].remove(stale.num)
except ValueError:
pass
else:
- # Remove the stale node
- del(self.buckets[i].l[it])
+ # Removed the stale node
removed = True
+ log.msg('Removed node from routing table: %s/%s' % (stale.host, stale.port))
# Insert the new node
- if new and self._bucketIndexForInt(new.num) == i and len(self.buckets[i].l) < K:
- self.buckets[i].l.append(new)
+ if new and self._bucketIndexForInt(new.num) == i and self.buckets[i].len() < K:
+ self.buckets[i].add(new)
elif removed:
self._mergeBucket(i)
def insertNode(self, node, contacted = True):
"""Try to insert a node in the routing table.
- This inserts the node, returning None if successful, otherwise returns
+ This inserts the node, returning True if successful, False if the
+ node could have been added if it responds to a ping, otherwise returns
the oldest node in the bucket if it's full. The caller is then
responsible for pinging the returned node and calling replaceStaleNode
if it doesn't respond. contacted means that yes, we contacted THEM and
@type contacted: C{boolean}
@param contacted: whether the new node is known to be good, i.e.
responded to a request (optional, defaults to True)
- @rtype: L{node.Node}
- @return: None if successful (the bucket wasn't full), otherwise returns the oldest node in the bucket
+ @rtype: L{node.Node} or C{boolean}
+ @return: True if successful (the bucket wasn't full), False if the
+ node could have been added if it was contacted, otherwise
+ returns the oldest node in the bucket
"""
assert node.id != NULL_ID
- if node.id == self.node.id: return
+ if node.id == self.node.id: return True
# Get the bucket for this node
i = self._bucketIndexForInt(node.num)
# Check to see if node is in the bucket already
try:
- it = self.buckets[i].l.index(node.num)
+ self.buckets[i].node(node.num)
except ValueError:
pass
else:
# It responded, so update it
node.updateLastSeen()
# move node to end of bucket
- del(self.buckets[i].l[it])
+ self.buckets[i].remove(node.num)
# note that we removed the original and replaced it with the new one
# utilizing this nodes new contact info
- self.buckets[i].l.append(node)
- self.buckets[i].touch()
- return
+ self.buckets[i].add(node)
+ return True
# We don't have this node, check to see if the bucket is full
- if len(self.buckets[i].l) < K:
+ if self.buckets[i].len() < K:
# Not full, append this node and return
if contacted:
node.updateLastSeen()
- self.buckets[i].l.append(node)
- self.buckets[i].touch()
- return
+ self.buckets[i].add(node)
+ log.msg('Added node to routing table: %s/%s' % (node.host, node.port))
+ return True
+ return False
# Bucket is full, check to see if the local node is not in the bucket
if not (self.buckets[i].min <= self.node < self.buckets[i].max):
# Local node not in the bucket, can't split it, return the oldest node
- return self.buckets[i].l[0]
+ return self.buckets[i].oldest()
# Make sure our table isn't FULL, this is really unlikely
if len(self.buckets) >= (khash.HASH_LENGTH*8):
return
# This bucket is full and contains our node, split the bucket
- self._splitBucket(self.buckets[i])
+ newBucket = self.buckets[i].split()
+ self.buckets.insert(i + 1, newBucket)
# Now that the bucket is split and balanced, try to insert the node again
return self.insertNode(node)
Call this any time you get a message from a node, it will update it
in the table if it's there.
- @type id: C{string} of C{int} or L{node.Node}
+ @type id: C{string} or C{int} or L{node.Node}
@param id: the node ID to mark as just having been seen
@rtype: C{datetime.datetime}
@return: the old lastSeen time of the node, or None if it's not in the table
# Check to see if node is in the bucket
try:
- it = self.buckets[i].l.index(num)
+ tstamp = self.buckets[i].justSeen(num)
except ValueError:
return None
else:
- # The node is in the bucket
- n = self.buckets[i].l[it]
- tstamp = n.lastSeen
- n.updateLastSeen()
-
- # Move the node to the end and touch the bucket
- del(self.buckets[i].l[it])
- self.buckets[i].l.append(n)
- self.buckets[i].touch()
-
return tstamp
def invalidateNode(self, n):
self.replaceStaleNode(n)
def nodeFailed(self, node):
- """Mark a node as having failed once, and remove it if it has failed too much."""
+ """Mark a node as having failed once, and remove it if it has failed too much.
+
+ @return: whether the node is in the routing table
+ """
# Get the bucket number
num = self._nodeNum(node)
i = self._bucketIndexForInt(num)
# Check to see if node is in the bucket
try:
- it = self.buckets[i].l.index(num)
+ n = self.buckets[i].node(num)
except ValueError:
- return None
+ return False
else:
# The node is in the bucket
- n = self.buckets[i].l[it]
if n.msgFailed() >= self.config['MAX_FAILURES']:
self.invalidateNode(n)
+ return False
+ return True
class KBucket:
"""Single bucket of nodes in a kademlia-like routing table.
- @type l: C{list} of L{node.Node}
- @ivar l: the nodes that are in this bucket
+ @type nodes: C{list} of L{node.Node}
+ @ivar nodes: the nodes that are in this bucket
@type min: C{long}
@ivar min: the minimum node ID that can be in this bucket
@type max: C{long}
@type max: C{long}
@param max: the maximum node ID that can be in this bucket
"""
- self.l = contents
+ self.nodes = contents
self.min = min
self.max = max
self.lastAccessed = datetime.now()
- def touch(self):
- """Update the L{lastAccessed} time."""
- self.lastAccessed = datetime.now()
+ def __repr__(self):
+ return "<KBucket %d items (%f to %f, range %d)>" % (
+ len(self.nodes), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2))
+ #{ List-like functions
+ def len(self): return len(self.nodes)
+ def list(self): return list(self.nodes)
+ def node(self, num): return self.nodes[self.nodes.index(num)]
+ def remove(self, num): return self.nodes.pop(self.nodes.index(num))
+ def oldest(self): return self.nodes[0]
+
+ def add(self, node):
+ """Add the node in the correct sorted order."""
+ i = len(self.nodes)
+ while i > 0 and node.lastSeen < self.nodes[i-1].lastSeen:
+ i -= 1
+ self.nodes.insert(i, node)
+
def sort(self):
"""Sort the nodes in the bucket by their lastSeen time."""
def _sort(a, b):
elif a.lastSeen < b.lastSeen:
return -1
return 0
- self.l.sort(_sort)
+ self.nodes.sort(_sort)
+
+ #{ Bucket functions
+ def touch(self):
+ """Update the L{lastAccessed} time."""
+ self.lastAccessed = datetime.now()
+
+ def justSeen(self, num):
+ """Mark a node as having been seen.
+
+ @param num: the number of the node just seen
+ """
+ i = self.nodes.index(num)
+
+ # The node is in the bucket
+ n = self.nodes[i]
+ tstamp = n.lastSeen
+ n.updateLastSeen()
+
+ # Move the node to the end and touch the bucket
+ self.nodes.pop(i)
+ self.nodes.append(n)
+
+ return tstamp
- def getNodeWithInt(self, num):
- """Get the node in the bucket with that number.
+ def split(self):
+ """Split a bucket in two.
- @type num: C{long}
- @param num: the node ID to look for
- @raise ValueError: if the node ID is not in the bucket
- @rtype: L{node.Node}
- @return: the node
+ @rtype: L{KBucket}
+ @return: the new bucket split from this one
"""
- if num in self.l: return num
- else: raise ValueError
+ # Create a new bucket with half the (upper) range of the current bucket
+ diff = (self.max - self.min) / 2
+ new = KBucket([], self.max - diff, self.max)
- def __repr__(self):
- return "<KBucket %d items (%f to %f, range %d)>" % (
- len(self.l), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2))
+ # Reduce the input bucket's (upper) range
+ self.max = self.max - diff
+
+ # Transfer nodes to the new bucket
+ for node in self.nodes[:]:
+ if node.num >= self.max:
+ self.nodes.remove(node)
+ new.add(node)
+ return new
+ def merge(self, old):
+ """Try to merge two buckets into one.
+
+ @type old: L{KBucket}
+ @param old: the bucket to merge into this one
+ @return: whether a merge was done or not
+ """
+ # Decide if we should do a merge
+ if len(self.nodes) + old.len() > K:
+ return False
+
+ # Set the range to cover the other's as well
+ self.min = min(self.min, old.min)
+ self.max = max(self.max, old.max)
+
+ # Transfer the other's nodes to this bucket, merging the sorting
+ i = 0
+ for node in old.list():
+ while i < len(self.nodes) and self.nodes[i].lastSeen <= node.lastSeen:
+ i += 1
+ self.nodes.insert(i, node)
+ i += 1
+
+ return True
+
#{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long
def __lt__(self, a):
if isinstance(a, Node): a = a.num
def testAddNode(self):
self.b = Node(khash.newID(), '127.0.0.1', 2003)
self.t.insertNode(self.b)
- self.failUnlessEqual(len(self.t.buckets[0].l), 1)
- self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
+ self.failUnlessEqual(len(self.t.buckets[0].nodes), 1)
+ self.failUnlessEqual(self.t.buckets[0].nodes[0], self.b)
def testRemove(self):
self.testAddNode()
self.t.invalidateNode(self.b)
- self.failUnlessEqual(len(self.t.buckets[0].l), 0)
+ self.failUnlessEqual(len(self.t.buckets[0].nodes), 0)
def testMergeBuckets(self):
for i in xrange(1000):
self.t.insertNode(b)
num = len(self.t.buckets)
i = self.t._bucketIndexForInt(self.a.num)
- for b in self.t.buckets[i].l[:]:
+ for b in self.t.buckets[i].nodes[:]:
self.t.invalidateNode(b)
self.failUnlessEqual(len(self.t.buckets), num-1)
self.testAddNode()
for i in range(self.t.config['MAX_FAILURES'] - 1):
self.t.nodeFailed(self.b)
- self.failUnlessEqual(len(self.t.buckets[0].l), 1)
- self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
+ self.failUnlessEqual(len(self.t.buckets[0].nodes), 1)
+ self.failUnlessEqual(self.t.buckets[0].nodes[0], self.b)
self.t.nodeFailed(self.b)
- self.failUnlessEqual(len(self.t.buckets[0].l), 0)
+ self.failUnlessEqual(len(self.t.buckets[0].nodes), 0)