3e60f5c37d0156e6f51c0f9d3543e689012424f4
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / ktable.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The routing table and buckets for a kademlia-like DHT."""
5
6 from datetime import datetime
7 from bisect import bisect_left
8
9 from twisted.python import log
10 from twisted.trial import unittest
11
12 import khash
13 from node import Node, NULL_ID
14
15 class KTable:
16     """Local routing table for a kademlia-like distributed hash table.
17     
18     @type node: L{node.Node}
19     @ivar node: the local node
20     @type config: C{dictionary}
21     @ivar config: the configuration parameters for the DHT
22     @type buckets: C{list} of L{KBucket}
23     @ivar buckets: the buckets of nodes in the routing table
24     """
25     
26     def __init__(self, node, config):
27         """Initialize the first empty bucket of everything.
28         
29         @type node: L{node.Node}
30         @param node: the local node
31         @type config: C{dictionary}
32         @param config: the configuration parameters for the DHT
33         """
34         # this is the root node, a.k.a. US!
35         assert node.id != NULL_ID
36         self.node = node
37         self.config = config
38         self.buckets = [KBucket([], 0L, 2L**self.config['HASH_LENGTH'])]
39         
40     def _bucketIndexForInt(self, num):
41         """Find the index of the bucket that should hold the node's ID number."""
42         return bisect_left(self.buckets, num)
43     
44     def _nodeNum(self, id):
45         """Takes different types of input and converts to the node ID number.
46
47         @type id: C{string} of C{int} or L{node.Node}
48         @param id: the ID to find nodes that are close to
49         @raise TypeError: if id does not properly identify an ID
50         """
51
52         # Get the ID number from the input
53         if isinstance(id, str):
54             return khash.intify(id)
55         elif isinstance(id, Node):
56             return id.num
57         elif isinstance(id, int) or isinstance(id, long):
58             return id
59         else:
60             raise TypeError, "requires an int, string, or Node input"
61             
62     def findNodes(self, id):
63         """Find the K nodes in our own local table closest to the ID.
64
65         @type id: C{string} of C{int} or L{node.Node}
66         @param id: the ID to find nodes that are close to
67         """
68
69         # Get the ID number from the input
70         num = self._nodeNum(id)
71             
72         # Get the K closest nodes from the appropriate bucket
73         i = self._bucketIndexForInt(num)
74         nodes = list(self.buckets[i].l)
75         
76         # Make sure we have enough
77         if len(nodes) < self.config['K']:
78             # Look in adjoining buckets for nodes
79             min = i - 1
80             max = i + 1
81             while len(nodes) < self.config['K'] and (min >= 0 or max < len(self.buckets)):
82                 # Add the adjoining buckets' nodes to the list
83                 if min >= 0:
84                     nodes = nodes + self.buckets[min].l
85                 if max < len(self.buckets):
86                     nodes = nodes + self.buckets[max].l
87                 min = min - 1
88                 max = max + 1
89     
90         # Sort the found nodes by proximity to the id and return the closest K
91         nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
92         return nodes[:self.config['K']]
93         
94     def _splitBucket(self, a):
95         """Split a bucket in two.
96         
97         @type a: L{KBucket}
98         @param a: the bucket to split
99         """
100         # Create a new bucket with half the (upper) range of the current bucket
101         diff = (a.max - a.min) / 2
102         b = KBucket([], a.max - diff, a.max)
103         self.buckets.insert(self.buckets.index(a.min) + 1, b)
104         
105         # Reduce the input bucket's (upper) range 
106         a.max = a.max - diff
107
108         # Transfer nodes to the new bucket
109         for anode in a.l[:]:
110             if anode.num >= a.max:
111                 a.l.remove(anode)
112                 b.l.append(anode)
113     
114     def replaceStaleNode(self, stale, new = None):
115         """Replace a stale node in a bucket with a new one.
116         
117         This is used by clients to replace a node returned by insertNode after
118         it fails to respond to a ping.
119         
120         @type stale: L{node.Node}
121         @param stale: the stale node to remove from the bucket
122         @type new: L{node.Node}
123         @param new: the new node to add in it's place (optional, defaults to
124             not adding any node in the old node's place)
125         """
126         # Find the stale node's bucket
127         i = self._bucketIndexForInt(stale.num)
128         try:
129             it = self.buckets[i].l.index(stale.num)
130         except ValueError:
131             return
132     
133         # Remove the stale node and insert the new one
134         del(self.buckets[i].l[it])
135         if new:
136             self.buckets[i].l.append(new)
137     
138     def insertNode(self, node, contacted = True):
139         """Try to insert a node in the routing table.
140         
141         This inserts the node, returning None if successful, otherwise returns
142         the oldest node in the bucket if it's full. The caller is then
143         responsible for pinging the returned node and calling replaceStaleNode
144         if it doesn't respond. contacted means that yes, we contacted THEM and
145         we know the node is reachable.
146         
147         @type node: L{node.Node}
148         @param node: the new node to try and insert
149         @type contacted: C{boolean}
150         @param contacted: whether the new node is known to be good, i.e.
151             responded to a request (optional, defaults to True)
152         @rtype: L{node.Node}
153         @return: None if successful (the bucket wasn't full), otherwise returns the oldest node in the bucket
154         """
155         assert node.id != NULL_ID
156         if node.id == self.node.id: return
157
158         # Get the bucket for this node
159         i = self._bucketIndexForInt(node.num)
160
161         # Check to see if node is in the bucket already
162         try:
163             it = self.buckets[i].l.index(node.num)
164         except ValueError:
165             pass
166         else:
167             # The node is already in the bucket
168             if contacted:
169                 # It responded, so update it
170                 node.updateLastSeen()
171                 # move node to end of bucket
172                 del(self.buckets[i].l[it])
173                 # note that we removed the original and replaced it with the new one
174                 # utilizing this nodes new contact info
175                 self.buckets[i].l.append(node)
176                 self.buckets[i].touch()
177             return
178         
179         # We don't have this node, check to see if the bucket is full
180         if len(self.buckets[i].l) < self.config['K']:
181             # Not full, append this node and return
182             if contacted:
183                 node.updateLastSeen()
184             self.buckets[i].l.append(node)
185             self.buckets[i].touch()
186             return
187             
188         # Bucket is full, check to see if the local node is not in the bucket
189         if not (self.buckets[i].min <= self.node < self.buckets[i].max):
190             # Local node not in the bucket, can't split it, return the oldest node
191             return self.buckets[i].l[0]
192         
193         # Make sure our table isn't FULL, this is really unlikely
194         if len(self.buckets) >= self.config['HASH_LENGTH']:
195             log.err("Hash Table is FULL!  Increase K!")
196             return
197             
198         # This bucket is full and contains our node, split the bucket
199         self._splitBucket(self.buckets[i])
200         
201         # Now that the bucket is split and balanced, try to insert the node again
202         return self.insertNode(node)
203     
204     def justSeenNode(self, id):
205         """Mark a node as just having been seen.
206         
207         Call this any time you get a message from a node, it will update it
208         in the table if it's there.
209
210         @type id: C{string} of C{int} or L{node.Node}
211         @param id: the node ID to mark as just having been seen
212         @rtype: C{datetime.datetime}
213         @return: the old lastSeen time of the node, or None if it's not in the table
214         """
215         # Get the bucket number
216         num = self._nodeNum(id)
217         i = self._bucketIndexForInt(num)
218
219         # Check to see if node is in the bucket
220         try:
221             it = self.buckets[i].l.index(num)
222         except ValueError:
223             return None
224         else:
225             # The node is in the bucket
226             n = self.buckets[i].l[it]
227             tstamp = n.lastSeen
228             n.updateLastSeen()
229             return tstamp
230     
231     def invalidateNode(self, n):
232         """Remove the node from the routing table.
233         
234         Forget about node n. Use this when you know that a node is invalid.
235         """
236         self.replaceStaleNode(n)
237     
238     def nodeFailed(self, node):
239         """Mark a node as having failed once, and remove it if it has failed too much."""
240         # Get the bucket number
241         num = self._nodeNum(node)
242         i = self._bucketIndexForInt(num)
243
244         # Check to see if node is in the bucket
245         try:
246             it = self.buckets[i].l.index(num)
247         except ValueError:
248             return None
249         else:
250             # The node is in the bucket
251             n = self.buckets[i].l[it]
252             if n.msgFailed() >= self.config['MAX_FAILURES']:
253                 self.invalidateNode(n)
254                         
255 class KBucket:
256     """Single bucket of nodes in a kademlia-like routing table.
257     
258     @type l: C{list} of L{node.Node}
259     @ivar l: the nodes that are in this bucket
260     @type min: C{long}
261     @ivar min: the minimum node ID that can be in this bucket
262     @type max: C{long}
263     @ivar max: the maximum node ID that can be in this bucket
264     @type lastAccessed: C{datetime.datetime}
265     @ivar lastAccessed: the last time a node in this bucket was successfully contacted
266     """
267     
268     def __init__(self, contents, min, max):
269         """Initialize the bucket with nodes.
270         
271         @type contents: C{list} of L{node.Node}
272         @param contents: the nodes to store in the bucket
273         @type min: C{long}
274         @param min: the minimum node ID that can be in this bucket
275         @type max: C{long}
276         @param max: the maximum node ID that can be in this bucket
277         """
278         self.l = contents
279         self.min = min
280         self.max = max
281         self.lastAccessed = datetime.now()
282         
283     def touch(self):
284         """Update the L{lastAccessed} time."""
285         self.lastAccessed = datetime.now()
286     
287     def getNodeWithInt(self, num):
288         """Get the node in the bucket with that number.
289         
290         @type num: C{long}
291         @param num: the node ID to look for
292         @raise ValueError: if the node ID is not in the bucket
293         @rtype: L{node.Node}
294         @return: the node
295         """
296         if num in self.l: return num
297         else: raise ValueError
298         
299     def __repr__(self):
300         return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max)
301     
302     #{ Comparators to bisect/index a list of buckets (by their range) with either a node or a long
303     def __lt__(self, a):
304         if isinstance(a, Node): a = a.num
305         return self.max <= a
306     def __le__(self, a):
307         if isinstance(a, Node): a = a.num
308         return self.min < a
309     def __gt__(self, a):
310         if isinstance(a, Node): a = a.num
311         return self.min > a
312     def __ge__(self, a):
313         if isinstance(a, Node): a = a.num
314         return self.max >= a
315     def __eq__(self, a):
316         if isinstance(a, Node): a = a.num
317         return self.min <= a and self.max > a
318     def __ne__(self, a):
319         if isinstance(a, Node): a = a.num
320         return self.min >= a or self.max < a
321
322 class TestKTable(unittest.TestCase):
323     """Unit tests for the routing table."""
324     
325     def setUp(self):
326         self.a = Node(khash.newID(), '127.0.0.1', 2002)
327         self.t = KTable(self.a, {'HASH_LENGTH': 160, 'K': 8, 'MAX_FAILURES': 3})
328
329     def testAddNode(self):
330         self.b = Node(khash.newID(), '127.0.0.1', 2003)
331         self.t.insertNode(self.b)
332         self.failUnlessEqual(len(self.t.buckets[0].l), 1)
333         self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
334
335     def testRemove(self):
336         self.testAddNode()
337         self.t.invalidateNode(self.b)
338         self.failUnlessEqual(len(self.t.buckets[0].l), 0)
339
340     def testFail(self):
341         self.testAddNode()
342         for i in range(self.t.config['MAX_FAILURES'] - 1):
343             self.t.nodeFailed(self.b)
344             self.failUnlessEqual(len(self.t.buckets[0].l), 1)
345             self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
346             
347         self.t.nodeFailed(self.b)
348         self.failUnlessEqual(len(self.t.buckets[0].l), 0)