1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from datetime import datetime
5 from bisect import bisect_left
7 from twisted.python import log
8 from twisted.trial import unittest
11 from node import Node, NULL_ID
14 """local routing table for a kademlia like distributed hash table"""
15 def __init__(self, node, config):
16 # this is the root node, a.k.a. US!
17 assert node.id != NULL_ID
20 self.buckets = [KBucket([], 0L, 2L**self.config['HASH_LENGTH'])]
22 def _bucketIndexForInt(self, num):
23 """the index of the bucket that should hold int"""
24 return bisect_left(self.buckets, num)
26 def findNodes(self, id):
28 return K nodes in our own local table closest to the ID.
31 if isinstance(id, str):
32 num = khash.intify(id)
33 elif isinstance(id, Node):
35 elif isinstance(id, int) or isinstance(id, long):
38 raise TypeError, "findNodes requires an int, string, or Node"
41 i = self._bucketIndexForInt(num)
43 # if this node is already in our table then return it
45 index = self.buckets[i].l.index(num)
49 return [self.buckets[i].l[index]]
51 # don't have the node, get the K closest nodes
52 nodes = nodes + self.buckets[i].l
53 if len(nodes) < self.config['K']:
57 while len(nodes) < self.config['K'] and (min >= 0 or max < len(self.buckets)):
58 #ASw: note that this requires K be even
60 nodes = nodes + self.buckets[min].l
61 if max < len(self.buckets):
62 nodes = nodes + self.buckets[max].l
66 nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
67 return nodes[:self.config['K']]
69 def _splitBucket(self, a):
70 diff = (a.max - a.min) / 2
71 b = KBucket([], a.max - diff, a.max)
72 self.buckets.insert(self.buckets.index(a.min) + 1, b)
74 # transfer nodes to new bucket
76 if anode.num >= a.max:
80 def replaceStaleNode(self, stale, new):
81 """this is used by clients to replace a node returned by insertNode after
82 it fails to respond to a Pong message"""
83 i = self._bucketIndexForInt(stale.num)
85 it = self.buckets[i].l.index(stale.num)
89 del(self.buckets[i].l[it])
91 self.buckets[i].l.append(new)
93 def insertNode(self, node, contacted=1):
95 this insert the node, returning None if successful, returns the oldest node in the bucket if it's full
96 the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!!
97 contacted means that yes, we contacted THEM and we know the node is reachable
99 assert node.id != NULL_ID
100 if node.id == self.node.id: return
101 # get the bucket for this node
102 i = self. _bucketIndexForInt(node.num)
103 # check to see if node is in the bucket already
105 it = self.buckets[i].l.index(node.num)
111 node.updateLastSeen()
112 # move node to end of bucket
113 xnode = self.buckets[i].l[it]
114 del(self.buckets[i].l[it])
115 # note that we removed the original and replaced it with the new one
116 # utilizing this nodes new contact info
117 self.buckets[i].l.append(xnode)
118 self.buckets[i].touch()
121 # we don't have this node, check to see if the bucket is full
122 if len(self.buckets[i].l) < self.config['K']:
123 # no, append this node and return
125 node.updateLastSeen()
126 self.buckets[i].l.append(node)
127 self.buckets[i].touch()
130 # bucket is full, check to see if self.node is in the bucket
131 if not (self.buckets[i].min <= self.node < self.buckets[i].max):
132 return self.buckets[i].l[0]
134 # this bucket is full and contains our node, split the bucket
135 if len(self.buckets) >= self.config['HASH_LENGTH']:
136 # our table is FULL, this is really unlikely
137 log.err("Hash Table is FULL! Increase K!")
140 self._splitBucket(self.buckets[i])
142 # now that the bucket is split and balanced, try to insert the node again
143 return self.insertNode(node)
145 def justSeenNode(self, id):
146 """call this any time you get a message from a node
147 it will update it in the table if it's there """
149 n = self.findNodes(id)[0]
157 def invalidateNode(self, n):
159 forget about node n - use when you know that node is invalid
161 self.replaceStaleNode(n, None)
163 def nodeFailed(self, node):
164 """ call this when a node fails to respond to a message, to invalidate that node """
166 n = self.findNodes(node.num)[0]
170 if n.msgFailed() >= self.config['MAX_FAILURES']:
171 self.invalidateNode(n)
174 def __init__(self, contents, min, max):
178 self.lastAccessed = datetime.now()
181 self.lastAccessed = datetime.now()
183 def getNodeWithInt(self, num):
184 if num in self.l: return num
185 else: raise ValueError
188 return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max)
191 # necessary for bisecting list of buckets with a hash expressed as an integer or a distance
192 # compares integer or node object with the bucket's range
194 if isinstance(a, Node): a = a.num
197 if isinstance(a, Node): a = a.num
200 if isinstance(a, Node): a = a.num
203 if isinstance(a, Node): a = a.num
206 if isinstance(a, Node): a = a.num
207 return self.min <= a and self.max > a
209 if isinstance(a, Node): a = a.num
210 return self.min >= a or self.max < a
212 class TestKTable(unittest.TestCase):
214 self.a = Node(khash.newID(), '127.0.0.1', 2002)
215 self.t = KTable(self.a, {'HASH_LENGTH': 160, 'K': 8, 'MAX_FAILURES': 3})
217 def testAddNode(self):
218 self.b = Node(khash.newID(), '127.0.0.1', 2003)
219 self.t.insertNode(self.b)
220 self.failUnlessEqual(len(self.t.buckets[0].l), 1)
221 self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
223 def testRemove(self):
225 self.t.invalidateNode(self.b)
226 self.failUnlessEqual(len(self.t.buckets[0].l), 0)
230 for i in range(self.t.config['MAX_FAILURES'] - 1):
231 self.t.nodeFailed(self.b)
232 self.failUnlessEqual(len(self.t.buckets[0].l), 1)
233 self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
235 self.t.nodeFailed(self.b)
236 self.failUnlessEqual(len(self.t.buckets[0].l), 0)