2 """The routing table and buckets for a kademlia-like DHT.
4 @var K: the Kademlia "K" constant, this should be an even number
7 from datetime import datetime
8 from bisect import bisect_left
9 from math import log as loge
11 from twisted.python import log
12 from twisted.trial import unittest
15 from node import Node, NULL_ID
20 """Local routing table for a kademlia-like distributed hash table.
22 @type node: L{node.Node}
23 @ivar node: the local node
24 @type config: C{dictionary}
25 @ivar config: the configuration parameters for the DHT
26 @type buckets: C{list} of L{KBucket}
27 @ivar buckets: the buckets of nodes in the routing table
30 def __init__(self, node, config):
31 """Initialize the first empty bucket of everything.
33 @type node: L{node.Node}
34 @param node: the local node
35 @type config: C{dictionary}
36 @param config: the configuration parameters for the DHT
38 # this is the root node, a.k.a. US!
39 assert node.id != NULL_ID
42 self.buckets = [KBucket([], 0L, 2L**(khash.HASH_LENGTH*8))]
44 def _bucketIndexForInt(self, num):
45 """Find the index of the bucket that should hold the node's ID number."""
46 return bisect_left(self.buckets, num)
48 def _nodeNum(self, id):
49 """Takes different types of input and converts to the node ID number.
51 @type id: C{string} or C{int} or L{node.Node}
52 @param id: the ID to find nodes that are close to
53 @raise TypeError: if id does not properly identify an ID
56 # Get the ID number from the input
57 if isinstance(id, str):
58 return khash.intify(id)
59 elif isinstance(id, Node):
61 elif isinstance(id, int) or isinstance(id, long):
64 raise TypeError, "requires an int, string, or Node input"
66 def findNodes(self, id):
67 """Find the K nodes in our own local table closest to the ID.
69 @type id: C{string} or C{int} or L{node.Node}
70 @param id: the ID to find nodes that are close to
73 # Get the ID number from the input
74 num = self._nodeNum(id)
76 # Get the K closest nodes from the appropriate bucket
77 i = self._bucketIndexForInt(num)
78 nodes = self.buckets[i].list()
80 # Make sure we have enough
82 # Look in adjoining buckets for nodes
85 while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
86 # Add the adjoining buckets' nodes to the list
88 nodes = nodes + self.buckets[min].list()
89 if max < len(self.buckets):
90 nodes = nodes + self.buckets[max].list()
94 # Sort the found nodes by proximity to the id and return the closest K
95 nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
98 def _mergeBucket(self, i):
99 """Merge unneeded buckets after removing a node.
102 @param i: the index of the bucket that lost a node
104 bucketRange = self.buckets[i].max - self.buckets[i].min
107 # Find if either of the neighbor buckets is the same size
108 # (this will only happen if this or the neighbour has our node ID in its range)
109 if i-1 >= 0 and self.buckets[i-1].max - self.buckets[i-1].min == bucketRange:
111 elif i+1 < len(self.buckets) and self.buckets[i+1].max - self.buckets[i+1].min == bucketRange:
115 if otherBucket is not None and self.buckets[i].merge(self.buckets[otherBucket]):
116 # Merge was successful, remove the old bucket
117 self.buckets.pop(otherBucket)
119 # Recurse to check if the neighbour buckets can also be merged
120 self._mergeBucket(min(i, otherBucket))
122 def replaceStaleNode(self, stale, new = None):
123 """Replace a stale node in a bucket with a new one.
125 This is used by clients to replace a node returned by insertNode after
126 it fails to respond to a ping.
128 @type stale: L{node.Node}
129 @param stale: the stale node to remove from the bucket
130 @type new: L{node.Node}
131 @param new: the new node to add in it's place (optional, defaults to
132 not adding any node in the old node's place)
134 # Find the stale node's bucket
136 i = self._bucketIndexForInt(stale.num)
138 self.buckets[i].remove(stale.num)
142 # Removed the stale node
144 log.msg('Removed node from routing table: %s/%s' % (stale.host, stale.port))
146 # Insert the new node
147 if new and self._bucketIndexForInt(new.num) == i and self.buckets[i].len() < K:
148 self.buckets[i].add(new)
152 def insertNode(self, node, contacted = True):
153 """Try to insert a node in the routing table.
155 This inserts the node, returning True if successful, False if the
156 node could have been added if it responds to a ping, otherwise returns
157 the oldest node in the bucket if it's full. The caller is then
158 responsible for pinging the returned node and calling replaceStaleNode
159 if it doesn't respond. contacted means that yes, we contacted THEM and
160 we know the node is reachable.
162 @type node: L{node.Node}
163 @param node: the new node to try and insert
164 @type contacted: C{boolean}
165 @param contacted: whether the new node is known to be good, i.e.
166 responded to a request (optional, defaults to True)
167 @rtype: L{node.Node} or C{boolean}
168 @return: True if successful (the bucket wasn't full), False if the
169 node could have been added if it was contacted, otherwise
170 returns the oldest node in the bucket
172 assert node.id != NULL_ID
173 if node.id == self.node.id: return True
175 # Get the bucket for this node
176 i = self._bucketIndexForInt(node.num)
178 # Check to see if node is in the bucket already
180 self.buckets[i].node(node.num)
184 # The node is already in the bucket
186 # It responded, so update it
187 node.updateLastSeen()
188 # move node to end of bucket
189 self.buckets[i].remove(node.num)
190 # note that we removed the original and replaced it with the new one
191 # utilizing this nodes new contact info
192 self.buckets[i].add(node)
193 self.buckets[i].touch()
196 # We don't have this node, check to see if the bucket is full
197 if self.buckets[i].len() < K:
198 # Not full, append this node and return
200 node.updateLastSeen()
201 self.buckets[i].add(node)
202 self.buckets[i].touch()
203 log.msg('Added node to routing table: %s/%s' % (node.host, node.port))
207 # Bucket is full, check to see if the local node is not in the bucket
208 if not (self.buckets[i].min <= self.node < self.buckets[i].max):
209 # Local node not in the bucket, can't split it, return the oldest node
210 return self.buckets[i].oldest()
212 # Make sure our table isn't FULL, this is really unlikely
213 if len(self.buckets) >= (khash.HASH_LENGTH*8):
214 log.err(RuntimeError("Hash Table is FULL! Increase K!"))
217 # This bucket is full and contains our node, split the bucket
218 newBucket = self.buckets[i].split()
219 self.buckets.insert(i + 1, newBucket)
221 # Now that the bucket is split and balanced, try to insert the node again
222 return self.insertNode(node)
224 def justSeenNode(self, id):
225 """Mark a node as just having been seen.
227 Call this any time you get a message from a node, it will update it
228 in the table if it's there.
230 @type id: C{string} or C{int} or L{node.Node}
231 @param id: the node ID to mark as just having been seen
232 @rtype: C{datetime.datetime}
233 @return: the old lastSeen time of the node, or None if it's not in the table
235 # Get the bucket number
236 num = self._nodeNum(id)
237 i = self._bucketIndexForInt(num)
239 # Check to see if node is in the bucket
241 tstamp = self.buckets[i].justSeen(num)
245 self.buckets[i].touch()
248 def invalidateNode(self, n):
249 """Remove the node from the routing table.
251 Forget about node n. Use this when you know that a node is invalid.
253 self.replaceStaleNode(n)
255 def nodeFailed(self, node):
256 """Mark a node as having failed once, and remove it if it has failed too much."""
257 # Get the bucket number
258 num = self._nodeNum(node)
259 i = self._bucketIndexForInt(num)
261 # Check to see if node is in the bucket
263 n = self.buckets[i].node(num)
267 # The node is in the bucket
268 if n.msgFailed() >= self.config['MAX_FAILURES']:
269 self.invalidateNode(n)
272 """Single bucket of nodes in a kademlia-like routing table.
274 @type nodes: C{list} of L{node.Node}
275 @ivar nodes: the nodes that are in this bucket
277 @ivar min: the minimum node ID that can be in this bucket
279 @ivar max: the maximum node ID that can be in this bucket
280 @type lastAccessed: C{datetime.datetime}
281 @ivar lastAccessed: the last time a node in this bucket was successfully contacted
284 def __init__(self, contents, min, max):
285 """Initialize the bucket with nodes.
287 @type contents: C{list} of L{node.Node}
288 @param contents: the nodes to store in the bucket
290 @param min: the minimum node ID that can be in this bucket
292 @param max: the maximum node ID that can be in this bucket
294 self.nodes = contents
297 self.lastAccessed = datetime.now()
300 return "<KBucket %d items (%f to %f, range %d)>" % (
301 len(self.nodes), loge(self.min+1)/loge(2), loge(self.max)/loge(2), loge(self.max-self.min)/loge(2))
303 #{ List-like functions
304 def len(self): return len(self.nodes)
305 def list(self): return list(self.nodes)
306 def node(self, num): return self.nodes[self.nodes.index(num)]
307 def remove(self, num): return self.nodes.pop(self.nodes.index(num))
308 def oldest(self): return self.nodes[0]
311 """Add the node in the correct sorted order."""
313 while i > 0 and node.lastSeen < self.nodes[i-1].lastSeen:
315 self.nodes.insert(i, node)
318 """Sort the nodes in the bucket by their lastSeen time."""
320 """Sort nodes by their lastSeen time."""
321 if a.lastSeen > b.lastSeen:
323 elif a.lastSeen < b.lastSeen:
326 self.nodes.sort(_sort)
330 """Update the L{lastAccessed} time."""
331 self.lastAccessed = datetime.now()
333 def justSeen(self, num):
334 """Mark a node as having been seen.
336 @param num: the number of the node just seen
338 i = self.nodes.index(num)
340 # The node is in the bucket
345 # Move the node to the end and touch the bucket
352 """Split a bucket in two.
355 @return: the new bucket split from this one
357 # Create a new bucket with half the (upper) range of the current bucket
358 diff = (self.max - self.min) / 2
359 new = KBucket([], self.max - diff, self.max)
361 # Reduce the input bucket's (upper) range
362 self.max = self.max - diff
364 # Transfer nodes to the new bucket
365 for node in self.nodes[:]:
366 if node.num >= self.max:
367 self.nodes.remove(node)
371 def merge(self, old):
372 """Try to merge two buckets into one.
374 @type old: L{KBucket}
375 @param old: the bucket to merge into this one
376 @return: whether a merge was done or not
378 # Decide if we should do a merge
379 if len(self.nodes) + old.len() > K:
382 # Set the range to cover the other's as well
383 self.min = min(self.min, old.min)
384 self.max = max(self.max, old.max)
386 # Transfer the other's nodes to this bucket, merging the sorting
388 for node in old.list():
389 while i < len(self.nodes) and self.nodes[i].lastSeen <= node.lastSeen:
391 self.nodes.insert(i, node)
396 #{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long
398 if isinstance(a, Node): a = a.num
401 if isinstance(a, Node): a = a.num
404 if isinstance(a, Node): a = a.num
407 if isinstance(a, Node): a = a.num
410 if isinstance(a, Node): a = a.num
411 return self.min <= a and self.max > a
413 if isinstance(a, Node): a = a.num
414 return self.min >= a or self.max < a
416 class TestKTable(unittest.TestCase):
417 """Unit tests for the routing table."""
420 self.a = Node(khash.newID(), '127.0.0.1', 2002)
421 self.t = KTable(self.a, {'MAX_FAILURES': 3})
423 def testAddNode(self):
424 self.b = Node(khash.newID(), '127.0.0.1', 2003)
425 self.t.insertNode(self.b)
426 self.failUnlessEqual(len(self.t.buckets[0].nodes), 1)
427 self.failUnlessEqual(self.t.buckets[0].nodes[0], self.b)
429 def testRemove(self):
431 self.t.invalidateNode(self.b)
432 self.failUnlessEqual(len(self.t.buckets[0].nodes), 0)
434 def testMergeBuckets(self):
435 for i in xrange(1000):
436 b = Node(khash.newID(), '127.0.0.1', 2003 + i)
438 num = len(self.t.buckets)
439 i = self.t._bucketIndexForInt(self.a.num)
440 for b in self.t.buckets[i].nodes[:]:
441 self.t.invalidateNode(b)
442 self.failUnlessEqual(len(self.t.buckets), num-1)
446 for i in range(self.t.config['MAX_FAILURES'] - 1):
447 self.t.nodeFailed(self.b)
448 self.failUnlessEqual(len(self.t.buckets[0].nodes), 1)
449 self.failUnlessEqual(self.t.buckets[0].nodes[0], self.b)
451 self.t.nodeFailed(self.b)
452 self.failUnlessEqual(len(self.t.buckets[0].nodes), 0)