Document the DHT's main khashmir module.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The main Khashmir program."""
5
6 import warnings
7 warnings.simplefilter("ignore", DeprecationWarning)
8
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
11 from sha import sha
12 import os
13
14 from twisted.internet.defer import Deferred
15 from twisted.internet import protocol, reactor
16 from twisted.trial import unittest
17
18 from db import DB
19 from ktable import KTable
20 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
21 from khash import newID, newIDInRange
22 from actions import FindNode, FindValue, GetValue, StoreValue
23 import krpc
24
25 class KhashmirBase(protocol.Factory):
26     """The base Khashmir class, with base functionality and find node, no key-value mappings.
27     
28     @type _Node: L{node.Node}
29     @ivar _Node: the knode implementation to use for this class of DHT
30     @type config: C{dictionary}
31     @ivar config: the configuration parameters for the DHT
32     @type port: C{int}
33     @ivar port: the port to listen on
34     @type store: L{db.DB}
35     @ivar store: the database to store nodes and key/value pairs in
36     @type node: L{node.Node}
37     @ivar node: this node
38     @type table: L{ktable.KTable}
39     @ivar table: the routing table
40     @type token_secrets: C{list} of C{string}
41     @ivar token_secrets: the current secrets to use to create tokens
42     @type udp: L{krpc.hostbroker}
43     @ivar udp: the factory for the KRPC protocol
44     @type listenport: L{twisted.internet.interfaces.IListeningPort}
45     @ivar listenport: the UDP listening port
46     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
47     @ivar next_checkpoint: the delayed call for the next checkpoint
48     """
49     
50     _Node = KNodeBase
51     
52     def __init__(self, config, cache_dir='/tmp'):
53         """Initialize the Khashmir class and call the L{setup} method.
54         
55         @type config: C{dictionary}
56         @param config: the configuration parameters for the DHT
57         @type cache_dir: C{string}
58         @param cache_dir: the directory to store all files in
59             (optional, defaults to the /tmp directory)
60         """
61         self.config = None
62         self.setup(config, cache_dir)
63         
64     def setup(self, config, cache_dir):
65         """Setup all the Khashmir sub-modules.
66         
67         @type config: C{dictionary}
68         @param config: the configuration parameters for the DHT
69         @type cache_dir: C{string}
70         @param cache_dir: the directory to store all files in
71         """
72         self.config = config
73         self.port = config['PORT']
74         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
75         self.node = self._loadSelfNode('', self.port)
76         self.table = KTable(self.node, config)
77         self.token_secrets = [newID()]
78         
79         # Start listening
80         self.udp = krpc.hostbroker(self, config)
81         self.udp.protocol = krpc.KRPC
82         self.listenport = reactor.listenUDP(self.port, self.udp)
83         
84         # Load the routing table and begin checkpointing
85         self._loadRoutingTable()
86         self.refreshTable(force = True)
87         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
88
89     def Node(self, id, host = None, port = None):
90         """Create a new node.
91         
92         @see: L{node.Node.__init__}
93         """
94         n = self._Node(id, host, port)
95         n.table = self.table
96         n.conn = self.udp.connectionForAddr((n.host, n.port))
97         return n
98     
99     def __del__(self):
100         """Stop listening for packets."""
101         self.listenport.stopListening()
102         
103     def _loadSelfNode(self, host, port):
104         """Create this node, loading any previously saved one."""
105         id = self.store.getSelfNode()
106         if not id:
107             id = newID()
108         return self._Node(id, host, port)
109         
110     def checkpoint(self):
111         """Perform some periodic maintenance operations."""
112         # Create a new token secret
113         self.token_secrets.insert(0, newID())
114         if len(self.token_secrets) > 3:
115             self.token_secrets.pop()
116             
117         # Save some parameters for reloading
118         self.store.saveSelfNode(self.node.id)
119         self.store.dumpRoutingTable(self.table.buckets)
120         
121         # DHT maintenance
122         self.store.expireValues(self.config['KEY_EXPIRE'])
123         self.refreshTable()
124         
125         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
126                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
127                                                  self.checkpoint)
128         
129     def _loadRoutingTable(self):
130         """Load the previous routing table nodes from the database.
131         
132         It's usually a good idea to call refreshTable(force = True) after
133         loading the table.
134         """
135         nodes = self.store.getRoutingTable()
136         for rec in nodes:
137             n = self.Node(rec[0], rec[1], int(rec[2]))
138             self.table.insertNode(n, contacted = False)
139             
140     #{ Local interface
141     def addContact(self, host, port, callback=None, errback=None):
142         """Ping this node and add the contact info to the table on pong.
143         
144         @type host: C{string}
145         @param host: the IP address of the node to contact
146         @type port: C{int}
147         @param port:the port of the node to contact
148         @type callback: C{method}
149         @param callback: the method to call with the results, it must take 1
150             parameter, the contact info returned by the node
151             (optional, defaults to doing nothing with the results)
152         @type errback: C{method}
153         @param errback: the method to call if an error occurs
154             (optional, defaults to calling the callback with None)
155         """
156         n = self.Node(NULL_ID, host, port)
157         self.sendJoin(n, callback=callback, errback=errback)
158
159     def findNode(self, id, callback, errback=None):
160         """Find the contact info for the K closest nodes in the global table.
161         
162         @type id: C{string}
163         @param id: the target ID to find the K closest nodes of
164         @type callback: C{method}
165         @param callback: the method to call with the results, it must take 1
166             parameter, the list of K closest nodes
167         @type errback: C{method}
168         @param errback: the method to call if an error occurs
169             (optional, defaults to doing nothing when an error occurs)
170         """
171         # Get K nodes out of local table/cache
172         nodes = self.table.findNodes(id)
173         d = Deferred()
174         if errback:
175             d.addCallbacks(callback, errback)
176         else:
177             d.addCallback(callback)
178
179         # If the target ID was found
180         if len(nodes) == 1 and nodes[0].id == id:
181             d.callback(nodes)
182         else:
183             # Start the finding nodes action
184             state = FindNode(self, id, d.callback, self.config)
185             reactor.callLater(0, state.goWithNodes, nodes)
186     
187     def insertNode(self, node, contacted = True):
188         """Try to insert a node in our local table, pinging oldest contact if necessary.
189         
190         If all you have is a host/port, then use L{addContact}, which calls this
191         method after receiving the PONG from the remote node. The reason for
192         the seperation is we can't insert a node into the table without its
193         node ID. That means of course the node passed into this method needs
194         to be a properly formed Node object with a valid ID.
195
196         @type node: L{node.Node}
197         @param node: the new node to try and insert
198         @type contacted: C{boolean}
199         @param contacted: whether the new node is known to be good, i.e.
200             responded to a request (optional, defaults to True)
201         """
202         old = self.table.insertNode(node, contacted=contacted)
203         if (old and old.id != self.node.id and
204             (datetime.now() - old.lastSeen) > 
205              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
206             
207             def _staleNodeHandler(oldnode = old, newnode = node):
208                 """The pinged node never responded, so replace it."""
209                 self.table.replaceStaleNode(oldnode, newnode)
210             
211             def _notStaleNodeHandler(dict, old=old):
212                 """Got a pong from the old node, so update it."""
213                 dict = dict['rsp']
214                 if dict['id'] == old.id:
215                     self.table.justSeenNode(old.id)
216             
217             # Bucket is full, check to see if old node is still available
218             df = old.ping(self.node.id)
219             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
220
221     def sendJoin(self, node, callback=None, errback=None):
222         """Join the DHT by pinging a bootstrap node.
223         
224         @type node: L{node.Node}
225         @param node: the node to send the join to
226         @type callback: C{method}
227         @param callback: the method to call with the results, it must take 1
228             parameter, the contact info returned by the node
229             (optional, defaults to doing nothing with the results)
230         @type errback: C{method}
231         @param errback: the method to call if an error occurs
232             (optional, defaults to calling the callback with None)
233         """
234
235         def _pongHandler(dict, node=node, self=self, callback=callback):
236             """Node responded properly, callback with response."""
237             n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
238             self.insertNode(n)
239             if callback:
240                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
241
242         def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
243             """Error occurred, fail node and errback or callback with error."""
244             table.nodeFailed(node)
245             if errback:
246                 errback()
247             elif callback:
248                 callback(None)
249         
250         df = node.join(self.node.id)
251         df.addCallbacks(_pongHandler, _defaultPong)
252
253     def findCloseNodes(self, callback=lambda a: None, errback = None):
254         """Perform a findNode on the ID one away from our own.
255
256         This will allow us to populate our table with nodes on our network
257         closest to our own. This is called as soon as we start up with an
258         empty table.
259
260         @type callback: C{method}
261         @param callback: the method to call with the results, it must take 1
262             parameter, the list of K closest nodes
263             (optional, defaults to doing nothing with the results)
264         @type errback: C{method}
265         @param errback: the method to call if an error occurs
266             (optional, defaults to doing nothing when an error occurs)
267         """
268         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
269         self.findNode(id, callback, errback)
270
271     def refreshTable(self, force = False):
272         """Check all the buckets for those that need refreshing.
273         
274         @param force: refresh all buckets regardless of last bucket access time
275             (optional, defaults to False)
276         """
277         def callback(nodes):
278             pass
279     
280         for bucket in self.table.buckets:
281             if force or (datetime.now() - bucket.lastAccessed > 
282                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
283                 # Choose a random ID in the bucket and try and find it
284                 id = newIDInRange(bucket.min, bucket.max)
285                 self.findNode(id, callback)
286
287     def stats(self):
288         """Collect some statistics about the DHT.
289         
290         @rtype: (C{int}, C{int})
291         @return: the number contacts in our routing table, and the estimated
292             number of nodes in the entire DHT
293         """
294         num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
295         num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
296         return (num_contacts, num_nodes)
297     
298     def shutdown(self):
299         """Closes the port and cancels pending later calls."""
300         self.listenport.stopListening()
301         try:
302             self.next_checkpoint.cancel()
303         except:
304             pass
305         self.store.close()
306
307     #{ Remote interface
308     def krpc_ping(self, id, _krpc_sender):
309         """Pong with our ID.
310         
311         @type id: C{string}
312         @param id: the node ID of the sender node
313         @type _krpc_sender: (C{string}, C{int})
314         @param _krpc_sender: the sender node's IP address and port
315         """
316         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
317         self.insertNode(n, contacted = False)
318
319         return {"id" : self.node.id}
320         
321     def krpc_join(self, id, _krpc_sender):
322         """Add the node by responding with its address and port.
323         
324         @type id: C{string}
325         @param id: the node ID of the sender node
326         @type _krpc_sender: (C{string}, C{int})
327         @param _krpc_sender: the sender node's IP address and port
328         """
329         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
330         self.insertNode(n, contacted = False)
331
332         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
333         
334     def krpc_find_node(self, target, id, _krpc_sender):
335         """Find the K closest nodes to the target in the local routing table.
336         
337         @type target: C{string}
338         @param target: the target ID to find nodes for
339         @type id: C{string}
340         @param id: the node ID of the sender node
341         @type _krpc_sender: (C{string}, C{int})
342         @param _krpc_sender: the sender node's IP address and port
343         """
344         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
345         self.insertNode(n, contacted = False)
346
347         nodes = self.table.findNodes(target)
348         nodes = map(lambda node: node.contactInfo(), nodes)
349         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
350         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
351
352
353 class KhashmirRead(KhashmirBase):
354     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
355
356     _Node = KNodeRead
357
358     #{ Local interface
359     def findValue(self, key, callback, errback=None):
360         """Get the nodes that have values for the key from the global table.
361         
362         @type key: C{string}
363         @param key: the target key to find the values for
364         @type callback: C{method}
365         @param callback: the method to call with the results, it must take 1
366             parameter, the list of nodes with values
367         @type errback: C{method}
368         @param errback: the method to call if an error occurs
369             (optional, defaults to doing nothing when an error occurs)
370         """
371         # Get K nodes out of local table/cache
372         nodes = self.table.findNodes(key)
373         d = Deferred()
374         if errback:
375             d.addCallbacks(callback, errback)
376         else:
377             d.addCallback(callback)
378
379         # Search for others starting with the locally found ones
380         state = FindValue(self, key, d.callback, self.config)
381         reactor.callLater(0, state.goWithNodes, nodes)
382
383     def valueForKey(self, key, callback, searchlocal = True):
384         """Get the values found for key in global table.
385         
386         Callback will be called with a list of values for each peer that
387         returns unique values. The final callback will be an empty list.
388
389         @type key: C{string}
390         @param key: the target key to get the values for
391         @type callback: C{method}
392         @param callback: the method to call with the results, it must take 2
393             parameters: the key, and the values found
394         @type searchlocal: C{boolean}
395         @param searchlocal: whether to also look for any local values
396         """
397         # Get any local values
398         if searchlocal:
399             l = self.store.retrieveValues(key)
400             if len(l) > 0:
401                 reactor.callLater(0, callback, key, l)
402         else:
403             l = []
404
405         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
406             """Use the found nodes to send requests for values to."""
407             state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
408             reactor.callLater(0, state.goWithNodes, nodes)
409             
410         # First lookup nodes that have values for the key
411         self.findValue(key, _getValueForKey)
412
413     #{ Remote interface
414     def krpc_find_value(self, key, id, _krpc_sender):
415         """Find the number of values stored locally for the key, and the K closest nodes.
416         
417         @type key: C{string}
418         @param key: the target key to find the values and nodes for
419         @type id: C{string}
420         @param id: the node ID of the sender node
421         @type _krpc_sender: (C{string}, C{int})
422         @param _krpc_sender: the sender node's IP address and port
423         """
424         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
425         self.insertNode(n, contacted = False)
426     
427         nodes = self.table.findNodes(key)
428         nodes = map(lambda node: node.contactInfo(), nodes)
429         num_values = self.store.countValues(key)
430         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
431
432     def krpc_get_value(self, key, num, id, _krpc_sender):
433         """Retrieve the values stored locally for the key.
434         
435         @type key: C{string}
436         @param key: the target key to retrieve the values for
437         @type num: C{int}
438         @param num: the maximum number of values to retrieve, or 0 to
439             retrieve all of them
440         @type id: C{string}
441         @param id: the node ID of the sender node
442         @type _krpc_sender: (C{string}, C{int})
443         @param _krpc_sender: the sender node's IP address and port
444         """
445         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
446         self.insertNode(n, contacted = False)
447     
448         l = self.store.retrieveValues(key)
449         if num == 0 or num >= len(l):
450             return {'values' : l, "id": self.node.id}
451         else:
452             shuffle(l)
453             return {'values' : l[:num], "id": self.node.id}
454
455
456 class KhashmirWrite(KhashmirRead):
457     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
458
459     _Node = KNodeWrite
460
461     #{ Local interface
462     def storeValueForKey(self, key, value, callback=None):
463         """Stores the value for the key in the global table.
464         
465         No status in this implementation, peers respond but don't indicate
466         status of storing values.
467
468         @type key: C{string}
469         @param key: the target key to store the value for
470         @type value: C{string}
471         @param value: the value to store with the key
472         @type callback: C{method}
473         @param callback: the method to call with the results, it must take 3
474             parameters: the key, the value stored, and the result of the store
475             (optional, defaults to doing nothing with the results)
476         """
477         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
478             """Use the returned K closest nodes to store the key at."""
479             if not response:
480                 def _storedValueHandler(key, value, sender):
481                     """Default callback that does nothing."""
482                     pass
483                 response = _storedValueHandler
484             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
485             reactor.callLater(0, action.goWithNodes, nodes)
486             
487         # First find the K closest nodes to operate on.
488         self.findNode(key, _storeValueForKey)
489                     
490     #{ Remote interface
491     def krpc_store_value(self, key, value, token, id, _krpc_sender):
492         """Store the value locally with the key.
493         
494         @type key: C{string}
495         @param key: the target key to store the value for
496         @type value: C{string}
497         @param value: the value to store with the key
498         @param token: the token to confirm that this peer contacted us previously
499         @type id: C{string}
500         @param id: the node ID of the sender node
501         @type _krpc_sender: (C{string}, C{int})
502         @param _krpc_sender: the sender node's IP address and port
503         """
504         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
505         self.insertNode(n, contacted = False)
506         for secret in self.token_secrets:
507             this_token = sha(secret + _krpc_sender[0]).digest()
508             if token == this_token:
509                 self.store.storeValue(key, value)
510                 return {"id" : self.node.id}
511         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
512
513
514 class Khashmir(KhashmirWrite):
515     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
516     _Node = KNodeWrite
517
518
519 class SimpleTests(unittest.TestCase):
520     
521     timeout = 10
522     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
523                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
524                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
525                     'MAX_FAILURES': 3,
526                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
527                     'KEY_EXPIRE': 3600, 'SPEW': False, }
528
529     def setUp(self):
530         d = self.DHT_DEFAULTS.copy()
531         d['PORT'] = 4044
532         self.a = Khashmir(d)
533         d = self.DHT_DEFAULTS.copy()
534         d['PORT'] = 4045
535         self.b = Khashmir(d)
536         
537     def tearDown(self):
538         self.a.shutdown()
539         self.b.shutdown()
540         os.unlink(self.a.store.db)
541         os.unlink(self.b.store.db)
542
543     def testAddContact(self):
544         self.failUnlessEqual(len(self.a.table.buckets), 1)
545         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
546
547         self.failUnlessEqual(len(self.b.table.buckets), 1)
548         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
549
550         self.a.addContact('127.0.0.1', 4045)
551         reactor.iterate()
552         reactor.iterate()
553         reactor.iterate()
554         reactor.iterate()
555
556         self.failUnlessEqual(len(self.a.table.buckets), 1)
557         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
558         self.failUnlessEqual(len(self.b.table.buckets), 1)
559         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
560
561     def testStoreRetrieve(self):
562         self.a.addContact('127.0.0.1', 4045)
563         reactor.iterate()
564         reactor.iterate()
565         reactor.iterate()
566         reactor.iterate()
567         self.got = 0
568         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
569         reactor.iterate()
570         reactor.iterate()
571         reactor.iterate()
572         reactor.iterate()
573         reactor.iterate()
574         reactor.iterate()
575         self.a.valueForKey(sha('foo').digest(), self._cb)
576         reactor.iterate()
577         reactor.iterate()
578         reactor.iterate()
579         reactor.iterate()
580         reactor.iterate()
581         reactor.iterate()
582         reactor.iterate()
583
584     def _cb(self, key, val):
585         if not val:
586             self.failUnlessEqual(self.got, 1)
587         elif 'foobar' in val:
588             self.got = 1
589
590
591 class MultiTest(unittest.TestCase):
592     
593     timeout = 30
594     num = 20
595     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
596                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
597                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
598                     'MAX_FAILURES': 3,
599                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
600                     'KEY_EXPIRE': 3600, 'SPEW': False, }
601
602     def _done(self, val):
603         self.done = 1
604         
605     def setUp(self):
606         self.l = []
607         self.startport = 4088
608         for i in range(self.num):
609             d = self.DHT_DEFAULTS.copy()
610             d['PORT'] = self.startport + i
611             self.l.append(Khashmir(d))
612         reactor.iterate()
613         reactor.iterate()
614         
615         for i in self.l:
616             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
617             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
618             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
619             reactor.iterate()
620             reactor.iterate()
621             reactor.iterate() 
622             
623         for i in self.l:
624             self.done = 0
625             i.findCloseNodes(self._done)
626             while not self.done:
627                 reactor.iterate()
628         for i in self.l:
629             self.done = 0
630             i.findCloseNodes(self._done)
631             while not self.done:
632                 reactor.iterate()
633
634     def tearDown(self):
635         for i in self.l:
636             i.shutdown()
637             os.unlink(i.store.db)
638             
639         reactor.iterate()
640         
641     def testStoreRetrieve(self):
642         for i in range(10):
643             K = newID()
644             V = newID()
645             
646             for a in range(3):
647                 self.done = 0
648                 def _scb(key, value, result):
649                     self.done = 1
650                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
651                 while not self.done:
652                     reactor.iterate()
653
654
655                 def _rcb(key, val):
656                     if not val:
657                         self.done = 1
658                         self.failUnlessEqual(self.got, 1)
659                     elif V in val:
660                         self.got = 1
661                 for x in range(3):
662                     self.got = 0
663                     self.done = 0
664                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
665                     while not self.done:
666                         reactor.iterate()