1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """The routing table and buckets for a kademlia-like DHT."""
6 from datetime import datetime
7 from bisect import bisect_left
9 from twisted.python import log
10 from twisted.trial import unittest
13 from node import Node, NULL_ID
16 """Local routing table for a kademlia-like distributed hash table.
18 @type node: L{node.Node}
19 @ivar node: the local node
20 @type config: C{dictionary}
21 @ivar config: the configuration parameters for the DHT
22 @type buckets: C{list} of L{KBucket}
23 @ivar buckets: the buckets of nodes in the routing table
26 def __init__(self, node, config):
27 """Initialize the first empty bucket of everything.
29 @type node: L{node.Node}
30 @param node: the local node
31 @type config: C{dictionary}
32 @param config: the configuration parameters for the DHT
34 # this is the root node, a.k.a. US!
35 assert node.id != NULL_ID
38 self.buckets = [KBucket([], 0L, 2L**self.config['HASH_LENGTH'])]
40 def _bucketIndexForInt(self, num):
41 """Find the index of the bucket that should hold the node's ID number."""
42 return bisect_left(self.buckets, num)
44 def _nodeNum(self, id):
45 """Takes different types of input and converts to the node ID number.
47 @type id: C{string} of C{int} or L{node.Node}
48 @param id: the ID to find nodes that are close to
49 @raise TypeError: if id does not properly identify an ID
52 # Get the ID number from the input
53 if isinstance(id, str):
54 return khash.intify(id)
55 elif isinstance(id, Node):
57 elif isinstance(id, int) or isinstance(id, long):
60 raise TypeError, "requires an int, string, or Node input"
62 def findNodes(self, id):
63 """Find the K nodes in our own local table closest to the ID.
65 @type id: C{string} of C{int} or L{node.Node}
66 @param id: the ID to find nodes that are close to
69 # Get the ID number from the input
70 num = self._nodeNum(id)
72 # Get the K closest nodes from the appropriate bucket
73 i = self._bucketIndexForInt(num)
74 nodes = list(self.buckets[i].l)
76 # Make sure we have enough
77 if len(nodes) < self.config['K']:
78 # Look in adjoining buckets for nodes
81 while len(nodes) < self.config['K'] and (min >= 0 or max < len(self.buckets)):
82 # Add the adjoining buckets' nodes to the list
84 nodes = nodes + self.buckets[min].l
85 if max < len(self.buckets):
86 nodes = nodes + self.buckets[max].l
90 # Sort the found nodes by proximity to the id and return the closest K
91 nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
92 return nodes[:self.config['K']]
94 def _splitBucket(self, a):
95 """Split a bucket in two.
98 @param a: the bucket to split
100 # Create a new bucket with half the (upper) range of the current bucket
101 diff = (a.max - a.min) / 2
102 b = KBucket([], a.max - diff, a.max)
103 self.buckets.insert(self.buckets.index(a.min) + 1, b)
105 # Reduce the input bucket's (upper) range
108 # Transfer nodes to the new bucket
110 if anode.num >= a.max:
114 def replaceStaleNode(self, stale, new = None):
115 """Replace a stale node in a bucket with a new one.
117 This is used by clients to replace a node returned by insertNode after
118 it fails to respond to a ping.
120 @type stale: L{node.Node}
121 @param stale: the stale node to remove from the bucket
122 @type new: L{node.Node}
123 @param new: the new node to add in it's place (optional, defaults to
124 not adding any node in the old node's place)
126 # Find the stale node's bucket
127 i = self._bucketIndexForInt(stale.num)
129 it = self.buckets[i].l.index(stale.num)
133 # Remove the stale node and insert the new one
134 del(self.buckets[i].l[it])
136 self.buckets[i].l.append(new)
138 def insertNode(self, node, contacted = True):
139 """Try to insert a node in the routing table.
141 This inserts the node, returning None if successful, otherwise returns
142 the oldest node in the bucket if it's full. The caller is then
143 responsible for pinging the returned node and calling replaceStaleNode
144 if it doesn't respond. contacted means that yes, we contacted THEM and
145 we know the node is reachable.
147 @type node: L{node.Node}
148 @param node: the new node to try and insert
149 @type contacted: C{boolean}
150 @param contacted: whether the new node is known to be good, i.e.
151 responded to a request (optional, defaults to True)
153 @return: None if successful (the bucket wasn't full), otherwise returns the oldest node in the bucket
155 assert node.id != NULL_ID
156 if node.id == self.node.id: return
158 # Get the bucket for this node
159 i = self._bucketIndexForInt(node.num)
161 # Check to see if node is in the bucket already
163 it = self.buckets[i].l.index(node.num)
167 # The node is already in the bucket
169 # It responded, so update it
170 node.updateLastSeen()
171 # move node to end of bucket
172 del(self.buckets[i].l[it])
173 # note that we removed the original and replaced it with the new one
174 # utilizing this nodes new contact info
175 self.buckets[i].l.append(node)
176 self.buckets[i].touch()
179 # We don't have this node, check to see if the bucket is full
180 if len(self.buckets[i].l) < self.config['K']:
181 # Not full, append this node and return
183 node.updateLastSeen()
184 self.buckets[i].l.append(node)
185 self.buckets[i].touch()
188 # Bucket is full, check to see if the local node is not in the bucket
189 if not (self.buckets[i].min <= self.node < self.buckets[i].max):
190 # Local node not in the bucket, can't split it, return the oldest node
191 return self.buckets[i].l[0]
193 # Make sure our table isn't FULL, this is really unlikely
194 if len(self.buckets) >= self.config['HASH_LENGTH']:
195 log.err("Hash Table is FULL! Increase K!")
198 # This bucket is full and contains our node, split the bucket
199 self._splitBucket(self.buckets[i])
201 # Now that the bucket is split and balanced, try to insert the node again
202 return self.insertNode(node)
204 def justSeenNode(self, id):
205 """Mark a node as just having been seen.
207 Call this any time you get a message from a node, it will update it
208 in the table if it's there.
210 @type id: C{string} of C{int} or L{node.Node}
211 @param id: the node ID to mark as just having been seen
212 @rtype: C{datetime.datetime}
213 @return: the old lastSeen time of the node, or None if it's not in the table
215 # Get the bucket number
216 num = self._nodeNum(id)
217 i = self._bucketIndexForInt(num)
219 # Check to see if node is in the bucket
221 it = self.buckets[i].l.index(num)
225 # The node is in the bucket
226 n = self.buckets[i].l[it]
231 def invalidateNode(self, n):
232 """Remove the node from the routing table.
234 Forget about node n. Use this when you know that a node is invalid.
236 self.replaceStaleNode(n)
238 def nodeFailed(self, node):
239 """Mark a node as having failed once, and remove it if it has failed too much."""
240 # Get the bucket number
241 num = self._nodeNum(node)
242 i = self._bucketIndexForInt(num)
244 # Check to see if node is in the bucket
246 it = self.buckets[i].l.index(num)
250 # The node is in the bucket
251 n = self.buckets[i].l[it]
252 if n.msgFailed() >= self.config['MAX_FAILURES']:
253 self.invalidateNode(n)
256 """Single bucket of nodes in a kademlia-like routing table.
258 @type l: C{list} of L{node.Node}
259 @ivar l: the nodes that are in this bucket
261 @ivar min: the minimum node ID that can be in this bucket
263 @ivar max: the maximum node ID that can be in this bucket
264 @type lastAccessed: C{datetime.datetime}
265 @ivar lastAccessed: the last time a node in this bucket was successfully contacted
268 def __init__(self, contents, min, max):
269 """Initialize the bucket with nodes.
271 @type contents: C{list} of L{node.Node}
272 @param contents: the nodes to store in the bucket
274 @param min: the minimum node ID that can be in this bucket
276 @param max: the maximum node ID that can be in this bucket
281 self.lastAccessed = datetime.now()
284 """Update the L{lastAccessed} time."""
285 self.lastAccessed = datetime.now()
287 def getNodeWithInt(self, num):
288 """Get the node in the bucket with that number.
291 @param num: the node ID to look for
292 @raise ValueError: if the node ID is not in the bucket
296 if num in self.l: return num
297 else: raise ValueError
300 return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max)
302 #{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long
304 if isinstance(a, Node): a = a.num
307 if isinstance(a, Node): a = a.num
310 if isinstance(a, Node): a = a.num
313 if isinstance(a, Node): a = a.num
316 if isinstance(a, Node): a = a.num
317 return self.min <= a and self.max > a
319 if isinstance(a, Node): a = a.num
320 return self.min >= a or self.max < a
322 class TestKTable(unittest.TestCase):
323 """Unit tests for the routing table."""
326 self.a = Node(khash.newID(), '127.0.0.1', 2002)
327 self.t = KTable(self.a, {'HASH_LENGTH': 160, 'K': 8, 'MAX_FAILURES': 3})
329 def testAddNode(self):
330 self.b = Node(khash.newID(), '127.0.0.1', 2003)
331 self.t.insertNode(self.b)
332 self.failUnlessEqual(len(self.t.buckets[0].l), 1)
333 self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
335 def testRemove(self):
337 self.t.invalidateNode(self.b)
338 self.failUnlessEqual(len(self.t.buckets[0].l), 0)
342 for i in range(self.t.config['MAX_FAILURES'] - 1):
343 self.t.nodeFailed(self.b)
344 self.failUnlessEqual(len(self.t.buckets[0].l), 1)
345 self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
347 self.t.nodeFailed(self.b)
348 self.failUnlessEqual(len(self.t.buckets[0].l), 0)