Make the DHT timeouts configuration parameters.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The main Khashmir program."""
5
6 import warnings
7 warnings.simplefilter("ignore", DeprecationWarning)
8
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
11 from sha import sha
12 from copy import copy
13 import os
14
15 from twisted.internet.defer import Deferred
16 from twisted.internet import protocol, reactor
17 from twisted.python import log
18 from twisted.trial import unittest
19
20 from db import DB
21 from ktable import KTable
22 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
23 from khash import newID, newIDInRange
24 from actions import FindNode, FindValue, GetValue, StoreValue
25 from stats import StatsLogger
26 import krpc
27
28 class KhashmirBase(protocol.Factory):
29     """The base Khashmir class, with base functionality and find node, no key-value mappings.
30     
31     @type _Node: L{node.Node}
32     @ivar _Node: the knode implementation to use for this class of DHT
33     @type config: C{dictionary}
34     @ivar config: the configuration parameters for the DHT
35     @type port: C{int}
36     @ivar port: the port to listen on
37     @type store: L{db.DB}
38     @ivar store: the database to store nodes and key/value pairs in
39     @type node: L{node.Node}
40     @ivar node: this node
41     @type table: L{ktable.KTable}
42     @ivar table: the routing table
43     @type token_secrets: C{list} of C{string}
44     @ivar token_secrets: the current secrets to use to create tokens
45     @type stats: L{stats.StatsLogger}
46     @ivar stats: the statistics gatherer
47     @type udp: L{krpc.hostbroker}
48     @ivar udp: the factory for the KRPC protocol
49     @type listenport: L{twisted.internet.interfaces.IListeningPort}
50     @ivar listenport: the UDP listening port
51     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
52     @ivar next_checkpoint: the delayed call for the next checkpoint
53     """
54     
55     _Node = KNodeBase
56     
57     def __init__(self, config, cache_dir='/tmp'):
58         """Initialize the Khashmir class and call the L{setup} method.
59         
60         @type config: C{dictionary}
61         @param config: the configuration parameters for the DHT
62         @type cache_dir: C{string}
63         @param cache_dir: the directory to store all files in
64             (optional, defaults to the /tmp directory)
65         """
66         self.config = None
67         self.setup(config, cache_dir)
68         
69     def setup(self, config, cache_dir):
70         """Setup all the Khashmir sub-modules.
71         
72         @type config: C{dictionary}
73         @param config: the configuration parameters for the DHT
74         @type cache_dir: C{string}
75         @param cache_dir: the directory to store all files in
76         """
77         self.config = config
78         self.port = config['PORT']
79         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
80         self.node = self._loadSelfNode('', self.port)
81         self.table = KTable(self.node, config)
82         self.token_secrets = [newID()]
83         self.stats = StatsLogger(self.table, self.store)
84         
85         # Start listening
86         self.udp = krpc.hostbroker(self, self.stats, config)
87         self.udp.protocol = krpc.KRPC
88         self.listenport = reactor.listenUDP(self.port, self.udp)
89         
90         # Load the routing table and begin checkpointing
91         self._loadRoutingTable()
92         self.refreshTable(force = True)
93         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
94
95     def Node(self, id, host = None, port = None):
96         """Create a new node.
97         
98         @see: L{node.Node.__init__}
99         """
100         n = self._Node(id, host, port)
101         n.table = self.table
102         n.conn = self.udp.connectionForAddr((n.host, n.port))
103         return n
104     
105     def __del__(self):
106         """Stop listening for packets."""
107         self.listenport.stopListening()
108         
109     def _loadSelfNode(self, host, port):
110         """Create this node, loading any previously saved one."""
111         id = self.store.getSelfNode()
112         if not id:
113             id = newID()
114         return self._Node(id, host, port)
115         
116     def checkpoint(self):
117         """Perform some periodic maintenance operations."""
118         # Create a new token secret
119         self.token_secrets.insert(0, newID())
120         if len(self.token_secrets) > 3:
121             self.token_secrets.pop()
122             
123         # Save some parameters for reloading
124         self.store.saveSelfNode(self.node.id)
125         self.store.dumpRoutingTable(self.table.buckets)
126         
127         # DHT maintenance
128         self.store.expireValues(self.config['KEY_EXPIRE'])
129         self.refreshTable()
130         
131         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
132                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
133                                                  self.checkpoint)
134         
135     def _loadRoutingTable(self):
136         """Load the previous routing table nodes from the database.
137         
138         It's usually a good idea to call refreshTable(force = True) after
139         loading the table.
140         """
141         nodes = self.store.getRoutingTable()
142         for rec in nodes:
143             n = self.Node(rec[0], rec[1], int(rec[2]))
144             self.table.insertNode(n, contacted = False)
145             
146     #{ Local interface
147     def addContact(self, host, port, callback=None, errback=None):
148         """Ping this node and add the contact info to the table on pong.
149         
150         @type host: C{string}
151         @param host: the IP address of the node to contact
152         @type port: C{int}
153         @param port:the port of the node to contact
154         @type callback: C{method}
155         @param callback: the method to call with the results, it must take 1
156             parameter, the contact info returned by the node
157             (optional, defaults to doing nothing with the results)
158         @type errback: C{method}
159         @param errback: the method to call if an error occurs
160             (optional, defaults to calling the callback with None)
161         """
162         n = self.Node(NULL_ID, host, port)
163         self.sendJoin(n, callback=callback, errback=errback)
164
165     def findNode(self, id, callback):
166         """Find the contact info for the K closest nodes in the global table.
167         
168         @type id: C{string}
169         @param id: the target ID to find the K closest nodes of
170         @type callback: C{method}
171         @param callback: the method to call with the results, it must take 1
172             parameter, the list of K closest nodes
173         """
174         # Start with our node
175         nodes = [copy(self.node)]
176
177         # Start the finding nodes action
178         state = FindNode(self, id, callback, self.config, self.stats)
179         reactor.callLater(0, state.goWithNodes, nodes)
180     
181     def insertNode(self, node, contacted = True):
182         """Try to insert a node in our local table, pinging oldest contact if necessary.
183         
184         If all you have is a host/port, then use L{addContact}, which calls this
185         method after receiving the PONG from the remote node. The reason for
186         the seperation is we can't insert a node into the table without its
187         node ID. That means of course the node passed into this method needs
188         to be a properly formed Node object with a valid ID.
189
190         @type node: L{node.Node}
191         @param node: the new node to try and insert
192         @type contacted: C{boolean}
193         @param contacted: whether the new node is known to be good, i.e.
194             responded to a request (optional, defaults to True)
195         """
196         old = self.table.insertNode(node, contacted=contacted)
197         if (old and old.id != self.node.id and
198             (datetime.now() - old.lastSeen) > 
199              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
200             
201             def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
202                 """The pinged node never responded, so replace it."""
203                 log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
204                 log.err(err)
205                 self.stats.completedAction('ping', start)
206                 self.table.replaceStaleNode(oldnode, newnode)
207             
208             def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
209                 """Got a pong from the old node, so update it."""
210                 self.stats.completedAction('ping', start)
211                 if dict['id'] == old.id:
212                     self.table.justSeenNode(old.id)
213             
214             # Bucket is full, check to see if old node is still available
215             self.stats.startedAction('ping')
216             df = old.ping(self.node.id)
217             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
218
219     def sendJoin(self, node, callback=None, errback=None):
220         """Join the DHT by pinging a bootstrap node.
221         
222         @type node: L{node.Node}
223         @param node: the node to send the join to
224         @type callback: C{method}
225         @param callback: the method to call with the results, it must take 1
226             parameter, the contact info returned by the node
227             (optional, defaults to doing nothing with the results)
228         @type errback: C{method}
229         @param errback: the method to call if an error occurs
230             (optional, defaults to calling the callback with None)
231         """
232
233         def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
234             """Node responded properly, callback with response."""
235             n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
236             self.stats.completedAction('join', start)
237             self.insertNode(n)
238             if callback:
239                 callback((dict['ip_addr'], dict['port']))
240
241         def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
242             """Error occurred, fail node and errback or callback with error."""
243             log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
244             log.err(err)
245             self.stats.completedAction('join', start)
246             self.table.nodeFailed(node)
247             if errback:
248                 errback()
249             elif callback:
250                 callback(None)
251         
252         self.stats.startedAction('join')
253         df = node.join(self.node.id)
254         df.addCallbacks(_pongHandler, _defaultPong)
255
256     def findCloseNodes(self, callback=lambda a: None):
257         """Perform a findNode on the ID one away from our own.
258
259         This will allow us to populate our table with nodes on our network
260         closest to our own. This is called as soon as we start up with an
261         empty table.
262
263         @type callback: C{method}
264         @param callback: the method to call with the results, it must take 1
265             parameter, the list of K closest nodes
266             (optional, defaults to doing nothing with the results)
267         """
268         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
269         self.findNode(id, callback)
270
271     def refreshTable(self, force = False):
272         """Check all the buckets for those that need refreshing.
273         
274         @param force: refresh all buckets regardless of last bucket access time
275             (optional, defaults to False)
276         """
277         def callback(nodes):
278             pass
279     
280         for bucket in self.table.buckets:
281             if force or (datetime.now() - bucket.lastAccessed > 
282                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
283                 # Choose a random ID in the bucket and try and find it
284                 id = newIDInRange(bucket.min, bucket.max)
285                 self.findNode(id, callback)
286
287     def shutdown(self):
288         """Closes the port and cancels pending later calls."""
289         self.listenport.stopListening()
290         try:
291             self.next_checkpoint.cancel()
292         except:
293             pass
294         self.store.close()
295     
296     def getStats(self):
297         """Gather the statistics for the DHT."""
298         return self.stats.formatHTML()
299
300     #{ Remote interface
301     def krpc_ping(self, id, _krpc_sender = None):
302         """Pong with our ID.
303         
304         @type id: C{string}
305         @param id: the node ID of the sender node
306         @type _krpc_sender: (C{string}, C{int})
307         @param _krpc_sender: the sender node's IP address and port
308         """
309         if _krpc_sender is not None:
310             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
311             self.insertNode(n, contacted = False)
312
313         return {"id" : self.node.id}
314         
315     def krpc_join(self, id, _krpc_sender = None):
316         """Add the node by responding with its address and port.
317         
318         @type id: C{string}
319         @param id: the node ID of the sender node
320         @type _krpc_sender: (C{string}, C{int})
321         @param _krpc_sender: the sender node's IP address and port
322         """
323         if _krpc_sender is not None:
324             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
325             self.insertNode(n, contacted = False)
326         else:
327             _krpc_sender = ('127.0.0.1', self.port)
328
329         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
330         
331     def krpc_find_node(self, id, target, _krpc_sender = None):
332         """Find the K closest nodes to the target in the local routing table.
333         
334         @type target: C{string}
335         @param target: the target ID to find nodes for
336         @type id: C{string}
337         @param id: the node ID of the sender node
338         @type _krpc_sender: (C{string}, C{int})
339         @param _krpc_sender: the sender node's IP address and port
340         """
341         if _krpc_sender is not None:
342             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
343             self.insertNode(n, contacted = False)
344         else:
345             _krpc_sender = ('127.0.0.1', self.port)
346
347         nodes = self.table.findNodes(target)
348         nodes = map(lambda node: node.contactInfo(), nodes)
349         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
350         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
351
352
353 class KhashmirRead(KhashmirBase):
354     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
355
356     _Node = KNodeRead
357
358     #{ Local interface
359     def findValue(self, key, callback):
360         """Get the nodes that have values for the key from the global table.
361         
362         @type key: C{string}
363         @param key: the target key to find the values for
364         @type callback: C{method}
365         @param callback: the method to call with the results, it must take 1
366             parameter, the list of nodes with values
367         """
368         # Start with ourself
369         nodes = [copy(self.node)]
370         
371         # Search for others starting with the locally found ones
372         state = FindValue(self, key, callback, self.config, self.stats)
373         reactor.callLater(0, state.goWithNodes, nodes)
374
375     def valueForKey(self, key, callback, searchlocal = True):
376         """Get the values found for key in global table.
377         
378         Callback will be called with a list of values for each peer that
379         returns unique values. The final callback will be an empty list.
380
381         @type key: C{string}
382         @param key: the target key to get the values for
383         @type callback: C{method}
384         @param callback: the method to call with the results, it must take 2
385             parameters: the key, and the values found
386         @type searchlocal: C{boolean}
387         @param searchlocal: whether to also look for any local values
388         """
389
390         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
391             """Use the found nodes to send requests for values to."""
392             # Get any local values
393             if searchlocal:
394                 l = self.store.retrieveValues(key)
395                 if len(l) > 0:
396                     node = copy(self.node)
397                     node.updateNumValues(len(l))
398                     nodes = nodes + [node]
399
400             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
401             reactor.callLater(0, state.goWithNodes, nodes)
402             
403         # First lookup nodes that have values for the key
404         self.findValue(key, _getValueForKey)
405
406     #{ Remote interface
407     def krpc_find_value(self, id, key, _krpc_sender = None):
408         """Find the number of values stored locally for the key, and the K closest nodes.
409         
410         @type key: C{string}
411         @param key: the target key to find the values and nodes for
412         @type id: C{string}
413         @param id: the node ID of the sender node
414         @type _krpc_sender: (C{string}, C{int})
415         @param _krpc_sender: the sender node's IP address and port
416         """
417         if _krpc_sender is not None:
418             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
419             self.insertNode(n, contacted = False)
420     
421         nodes = self.table.findNodes(key)
422         nodes = map(lambda node: node.contactInfo(), nodes)
423         num_values = self.store.countValues(key)
424         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
425
426     def krpc_get_value(self, id, key, num, _krpc_sender = None):
427         """Retrieve the values stored locally for the key.
428         
429         @type key: C{string}
430         @param key: the target key to retrieve the values for
431         @type num: C{int}
432         @param num: the maximum number of values to retrieve, or 0 to
433             retrieve all of them
434         @type id: C{string}
435         @param id: the node ID of the sender node
436         @type _krpc_sender: (C{string}, C{int})
437         @param _krpc_sender: the sender node's IP address and port
438         """
439         if _krpc_sender is not None:
440             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
441             self.insertNode(n, contacted = False)
442     
443         l = self.store.retrieveValues(key)
444         if num == 0 or num >= len(l):
445             return {'values' : l, "id": self.node.id}
446         else:
447             shuffle(l)
448             return {'values' : l[:num], "id": self.node.id}
449
450
451 class KhashmirWrite(KhashmirRead):
452     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
453
454     _Node = KNodeWrite
455
456     #{ Local interface
457     def storeValueForKey(self, key, value, callback=None):
458         """Stores the value for the key in the global table.
459         
460         No status in this implementation, peers respond but don't indicate
461         status of storing values.
462
463         @type key: C{string}
464         @param key: the target key to store the value for
465         @type value: C{string}
466         @param value: the value to store with the key
467         @type callback: C{method}
468         @param callback: the method to call with the results, it must take 3
469             parameters: the key, the value stored, and the result of the store
470             (optional, defaults to doing nothing with the results)
471         """
472         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
473             """Use the returned K closest nodes to store the key at."""
474             if not response:
475                 def _storedValueHandler(key, value, sender):
476                     """Default callback that does nothing."""
477                     pass
478                 response = _storedValueHandler
479             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
480             reactor.callLater(0, action.goWithNodes, nodes)
481             
482         # First find the K closest nodes to operate on.
483         self.findNode(key, _storeValueForKey)
484                     
485     #{ Remote interface
486     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
487         """Store the value locally with the key.
488         
489         @type key: C{string}
490         @param key: the target key to store the value for
491         @type value: C{string}
492         @param value: the value to store with the key
493         @param token: the token to confirm that this peer contacted us previously
494         @type id: C{string}
495         @param id: the node ID of the sender node
496         @type _krpc_sender: (C{string}, C{int})
497         @param _krpc_sender: the sender node's IP address and port
498         """
499         if _krpc_sender is not None:
500             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
501             self.insertNode(n, contacted = False)
502         else:
503             _krpc_sender = ('127.0.0.1', self.port)
504
505         for secret in self.token_secrets:
506             this_token = sha(secret + _krpc_sender[0]).digest()
507             if token == this_token:
508                 self.store.storeValue(key, value)
509                 return {"id" : self.node.id}
510         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
511
512
513 class Khashmir(KhashmirWrite):
514     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
515     _Node = KNodeWrite
516
517
518 class SimpleTests(unittest.TestCase):
519     
520     timeout = 10
521     DHT_DEFAULTS = {'PORT': 9977,
522                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
523                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
524                     'MAX_FAILURES': 3,
525                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
526                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
527                     'KEY_EXPIRE': 3600, 'SPEW': False, }
528
529     def setUp(self):
530         d = self.DHT_DEFAULTS.copy()
531         d['PORT'] = 4044
532         self.a = Khashmir(d)
533         d = self.DHT_DEFAULTS.copy()
534         d['PORT'] = 4045
535         self.b = Khashmir(d)
536         
537     def tearDown(self):
538         self.a.shutdown()
539         self.b.shutdown()
540         os.unlink(self.a.store.db)
541         os.unlink(self.b.store.db)
542
543     def testAddContact(self):
544         self.failUnlessEqual(len(self.a.table.buckets), 1)
545         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
546
547         self.failUnlessEqual(len(self.b.table.buckets), 1)
548         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
549
550         self.a.addContact('127.0.0.1', 4045)
551         reactor.iterate()
552         reactor.iterate()
553         reactor.iterate()
554         reactor.iterate()
555
556         self.failUnlessEqual(len(self.a.table.buckets), 1)
557         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
558         self.failUnlessEqual(len(self.b.table.buckets), 1)
559         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
560
561     def testStoreRetrieve(self):
562         self.a.addContact('127.0.0.1', 4045)
563         reactor.iterate()
564         reactor.iterate()
565         reactor.iterate()
566         reactor.iterate()
567         self.got = 0
568         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
569         reactor.iterate()
570         reactor.iterate()
571         reactor.iterate()
572         reactor.iterate()
573         reactor.iterate()
574         reactor.iterate()
575         self.a.valueForKey(sha('foo').digest(), self._cb)
576         reactor.iterate()
577         reactor.iterate()
578         reactor.iterate()
579         reactor.iterate()
580         reactor.iterate()
581         reactor.iterate()
582         reactor.iterate()
583
584     def _cb(self, key, val):
585         if not val:
586             self.failUnlessEqual(self.got, 1)
587         elif 'foobar' in val:
588             self.got = 1
589
590
591 class MultiTest(unittest.TestCase):
592     
593     timeout = 30
594     num = 20
595     DHT_DEFAULTS = {'PORT': 9977,
596                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
597                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
598                     'MAX_FAILURES': 3,
599                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
600                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
601                     'KEY_EXPIRE': 3600, 'SPEW': False, }
602
603     def _done(self, val):
604         self.done = 1
605         
606     def setUp(self):
607         self.l = []
608         self.startport = 4088
609         for i in range(self.num):
610             d = self.DHT_DEFAULTS.copy()
611             d['PORT'] = self.startport + i
612             self.l.append(Khashmir(d))
613         reactor.iterate()
614         reactor.iterate()
615         
616         for i in self.l:
617             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
618             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
619             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
620             reactor.iterate()
621             reactor.iterate()
622             reactor.iterate() 
623             
624         for i in self.l:
625             self.done = 0
626             i.findCloseNodes(self._done)
627             while not self.done:
628                 reactor.iterate()
629         for i in self.l:
630             self.done = 0
631             i.findCloseNodes(self._done)
632             while not self.done:
633                 reactor.iterate()
634
635     def tearDown(self):
636         for i in self.l:
637             i.shutdown()
638             os.unlink(i.store.db)
639             
640         reactor.iterate()
641         
642     def testStoreRetrieve(self):
643         for i in range(10):
644             K = newID()
645             V = newID()
646             
647             for a in range(3):
648                 self.done = 0
649                 def _scb(key, value, result):
650                     self.done = 1
651                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
652                 while not self.done:
653                     reactor.iterate()
654
655
656                 def _rcb(key, val):
657                     if not val:
658                         self.done = 1
659                         self.failUnlessEqual(self.got, 1)
660                     elif V in val:
661                         self.got = 1
662                 for x in range(3):
663                     self.got = 0
664                     self.done = 0
665                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
666                     while not self.done:
667                         reactor.iterate()