1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
5 from bisect import bisect_left
7 from const import K, HASH_LENGTH, NULL_ID, MAX_FAILURES
12 """local routing table for a kademlia like distributed hash table"""
13 def __init__(self, node):
14 # this is the root node, a.k.a. US!
16 self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)]
19 def _bucketIndexForInt(self, num):
20 """the index of the bucket that should hold int"""
21 return bisect_left(self.buckets, num)
23 def findNodes(self, id):
25 return K nodes in our own local table closest to the ID.
28 if isinstance(id, str):
29 num = khash.intify(id)
30 elif isinstance(id, Node):
32 elif isinstance(id, int) or isinstance(id, long):
35 raise TypeError, "findNodes requires an int, string, or Node"
38 i = self._bucketIndexForInt(num)
40 # if this node is already in our table then return it
42 index = self.buckets[i].l.index(num)
46 return [self.buckets[i].l[index]]
48 # don't have the node, get the K closest nodes
49 nodes = nodes + self.buckets[i].l
54 while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
55 #ASw: note that this requires K be even
57 nodes = nodes + self.buckets[min].l
58 if max < len(self.buckets):
59 nodes = nodes + self.buckets[max].l
63 nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
66 def _splitBucket(self, a):
67 diff = (a.max - a.min) / 2
68 b = KBucket([], a.max - diff, a.max)
69 self.buckets.insert(self.buckets.index(a.min) + 1, b)
71 # transfer nodes to new bucket
73 if anode.num >= a.max:
77 def replaceStaleNode(self, stale, new):
78 """this is used by clients to replace a node returned by insertNode after
79 it fails to respond to a Pong message"""
80 i = self._bucketIndexForInt(stale.num)
82 it = self.buckets[i].l.index(stale.num)
86 del(self.buckets[i].l[it])
88 self.buckets[i].l.append(new)
90 def insertNode(self, node, contacted=1):
92 this insert the node, returning None if successful, returns the oldest node in the bucket if it's full
93 the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!!
94 contacted means that yes, we contacted THEM and we know the node is reachable
96 assert node.id != NULL_ID
97 if node.id == self.node.id: return
98 # get the bucket for this node
99 i = self. _bucketIndexForInt(node.num)
100 # check to see if node is in the bucket already
102 it = self.buckets[i].l.index(node.num)
108 node.updateLastSeen()
109 # move node to end of bucket
110 xnode = self.buckets[i].l[it]
111 del(self.buckets[i].l[it])
112 # note that we removed the original and replaced it with the new one
113 # utilizing this nodes new contact info
114 self.buckets[i].l.append(xnode)
115 self.buckets[i].touch()
118 # we don't have this node, check to see if the bucket is full
119 if len(self.buckets[i].l) < K:
120 # no, append this node and return
122 node.updateLastSeen()
123 self.buckets[i].l.append(node)
124 self.buckets[i].touch()
127 # bucket is full, check to see if self.node is in the bucket
128 if not (self.buckets[i].min <= self.node < self.buckets[i].max):
129 return self.buckets[i].l[0]
131 # this bucket is full and contains our node, split the bucket
132 if len(self.buckets) >= HASH_LENGTH:
133 # our table is FULL, this is really unlikely
134 print "Hash Table is FULL! Increase K!"
137 self._splitBucket(self.buckets[i])
139 # now that the bucket is split and balanced, try to insert the node again
140 return self.insertNode(node)
142 def justSeenNode(self, id):
143 """call this any time you get a message from a node
144 it will update it in the table if it's there """
146 n = self.findNodes(id)[0]
154 def invalidateNode(self, n):
156 forget about node n - use when you know that node is invalid
158 self.replaceStaleNode(n, None)
160 def nodeFailed(self, node):
161 """ call this when a node fails to respond to a message, to invalidate that node """
163 n = self.findNodes(node.num)[0]
167 if n.msgFailed() >= MAX_FAILURES:
168 self.invalidateNode(n)
171 def __init__(self, contents, min, max):
175 self.lastAccessed = time()
178 self.lastAccessed = time()
180 def getNodeWithInt(self, num):
181 if num in self.l: return num
182 else: raise ValueError
185 return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max)
188 # necessary for bisecting list of buckets with a hash expressed as an integer or a distance
189 # compares integer or node object with the bucket's range
191 if isinstance(a, Node): a = a.num
194 if isinstance(a, Node): a = a.num
197 if isinstance(a, Node): a = a.num
200 if isinstance(a, Node): a = a.num
203 if isinstance(a, Node): a = a.num
204 return self.min <= a and self.max > a
206 if isinstance(a, Node): a = a.num
207 return self.min >= a or self.max < a
213 class TestKTable(unittest.TestCase):
215 self.a = Node().init(khash.newID(), 'localhost', 2002)
216 self.t = KTable(self.a)
218 def testAddNode(self):
219 self.b = Node().init(khash.newID(), 'localhost', 2003)
220 self.t.insertNode(self.b)
221 self.assertEqual(len(self.t.buckets[0].l), 1)
222 self.assertEqual(self.t.buckets[0].l[0], self.b)
224 def testRemove(self):
226 self.t.invalidateNode(self.b)
227 self.assertEqual(len(self.t.buckets[0].l), 0)
231 for i in range(MAX_FAILURES - 1):
232 self.t.nodeFailed(self.b)
233 self.assertEqual(len(self.t.buckets[0].l), 1)
234 self.assertEqual(self.t.buckets[0].l[0], self.b)
236 self.t.nodeFailed(self.b)
237 self.assertEqual(len(self.t.buckets[0].l), 0)
240 if __name__ == "__main__":