Clean up the copyrights mentioned in the code.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1
2 """The main Khashmir program."""
3
4 import warnings
5 warnings.simplefilter("ignore", DeprecationWarning)
6
7 from datetime import datetime, timedelta
8 from random import randrange, shuffle
9 from sha import sha
10 from copy import copy
11 import os
12
13 from twisted.internet.defer import Deferred
14 from twisted.internet import protocol, reactor
15 from twisted.python import log
16 from twisted.trial import unittest
17
18 from db import DB
19 from ktable import KTable
20 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
21 from khash import newID, newIDInRange
22 from actions import FindNode, FindValue, GetValue, StoreValue
23 from stats import StatsLogger
24 import krpc
25
26 class KhashmirBase(protocol.Factory):
27     """The base Khashmir class, with base functionality and find node, no key-value mappings.
28     
29     @type _Node: L{node.Node}
30     @ivar _Node: the knode implementation to use for this class of DHT
31     @type config: C{dictionary}
32     @ivar config: the configuration parameters for the DHT
33     @type port: C{int}
34     @ivar port: the port to listen on
35     @type store: L{db.DB}
36     @ivar store: the database to store nodes and key/value pairs in
37     @type node: L{node.Node}
38     @ivar node: this node
39     @type table: L{ktable.KTable}
40     @ivar table: the routing table
41     @type token_secrets: C{list} of C{string}
42     @ivar token_secrets: the current secrets to use to create tokens
43     @type stats: L{stats.StatsLogger}
44     @ivar stats: the statistics gatherer
45     @type udp: L{krpc.hostbroker}
46     @ivar udp: the factory for the KRPC protocol
47     @type listenport: L{twisted.internet.interfaces.IListeningPort}
48     @ivar listenport: the UDP listening port
49     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
50     @ivar next_checkpoint: the delayed call for the next checkpoint
51     """
52     
53     _Node = KNodeBase
54     
55     def __init__(self, config, cache_dir='/tmp'):
56         """Initialize the Khashmir class and call the L{setup} method.
57         
58         @type config: C{dictionary}
59         @param config: the configuration parameters for the DHT
60         @type cache_dir: C{string}
61         @param cache_dir: the directory to store all files in
62             (optional, defaults to the /tmp directory)
63         """
64         self.config = None
65         self.setup(config, cache_dir)
66         
67     def setup(self, config, cache_dir):
68         """Setup all the Khashmir sub-modules.
69         
70         @type config: C{dictionary}
71         @param config: the configuration parameters for the DHT
72         @type cache_dir: C{string}
73         @param cache_dir: the directory to store all files in
74         """
75         self.config = config
76         self.port = config['PORT']
77         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
78         self.node = self._loadSelfNode('', self.port)
79         self.table = KTable(self.node, config)
80         self.token_secrets = [newID()]
81         self.stats = StatsLogger(self.table, self.store)
82         
83         # Start listening
84         self.udp = krpc.hostbroker(self, self.stats, config)
85         self.udp.protocol = krpc.KRPC
86         self.listenport = reactor.listenUDP(self.port, self.udp)
87         
88         # Load the routing table and begin checkpointing
89         self._loadRoutingTable()
90         self.refreshTable(force = True)
91         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
92
93     def Node(self, id, host = None, port = None):
94         """Create a new node.
95         
96         @see: L{node.Node.__init__}
97         """
98         n = self._Node(id, host, port)
99         n.table = self.table
100         n.conn = self.udp.connectionForAddr((n.host, n.port))
101         return n
102     
103     def __del__(self):
104         """Stop listening for packets."""
105         self.listenport.stopListening()
106         
107     def _loadSelfNode(self, host, port):
108         """Create this node, loading any previously saved one."""
109         id = self.store.getSelfNode()
110         if not id:
111             id = newID()
112         return self._Node(id, host, port)
113         
114     def checkpoint(self):
115         """Perform some periodic maintenance operations."""
116         # Create a new token secret
117         self.token_secrets.insert(0, newID())
118         if len(self.token_secrets) > 3:
119             self.token_secrets.pop()
120             
121         # Save some parameters for reloading
122         self.store.saveSelfNode(self.node.id)
123         self.store.dumpRoutingTable(self.table.buckets)
124         
125         # DHT maintenance
126         self.store.expireValues(self.config['KEY_EXPIRE'])
127         self.refreshTable()
128         
129         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
130                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
131                                                  self.checkpoint)
132         
133     def _loadRoutingTable(self):
134         """Load the previous routing table nodes from the database.
135         
136         It's usually a good idea to call refreshTable(force = True) after
137         loading the table.
138         """
139         nodes = self.store.getRoutingTable()
140         for rec in nodes:
141             n = self.Node(rec[0], rec[1], int(rec[2]))
142             self.table.insertNode(n, contacted = False)
143             
144     #{ Local interface
145     def addContact(self, host, port, callback=None, errback=None):
146         """Ping this node and add the contact info to the table on pong.
147         
148         @type host: C{string}
149         @param host: the IP address of the node to contact
150         @type port: C{int}
151         @param port:the port of the node to contact
152         @type callback: C{method}
153         @param callback: the method to call with the results, it must take 1
154             parameter, the contact info returned by the node
155             (optional, defaults to doing nothing with the results)
156         @type errback: C{method}
157         @param errback: the method to call if an error occurs
158             (optional, defaults to calling the callback with None)
159         """
160         n = self.Node(NULL_ID, host, port)
161         self.sendJoin(n, callback=callback, errback=errback)
162
163     def findNode(self, id, callback):
164         """Find the contact info for the K closest nodes in the global table.
165         
166         @type id: C{string}
167         @param id: the target ID to find the K closest nodes of
168         @type callback: C{method}
169         @param callback: the method to call with the results, it must take 1
170             parameter, the list of K closest nodes
171         """
172         # Start with our node
173         nodes = [copy(self.node)]
174
175         # Start the finding nodes action
176         state = FindNode(self, id, callback, self.config, self.stats)
177         reactor.callLater(0, state.goWithNodes, nodes)
178     
179     def insertNode(self, node, contacted = True):
180         """Try to insert a node in our local table, pinging oldest contact if necessary.
181         
182         If all you have is a host/port, then use L{addContact}, which calls this
183         method after receiving the PONG from the remote node. The reason for
184         the seperation is we can't insert a node into the table without its
185         node ID. That means of course the node passed into this method needs
186         to be a properly formed Node object with a valid ID.
187
188         @type node: L{node.Node}
189         @param node: the new node to try and insert
190         @type contacted: C{boolean}
191         @param contacted: whether the new node is known to be good, i.e.
192             responded to a request (optional, defaults to True)
193         """
194         old = self.table.insertNode(node, contacted=contacted)
195         if (old and old.id != self.node.id and
196             (datetime.now() - old.lastSeen) > 
197              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
198             
199             def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()):
200                 """The pinged node never responded, so replace it."""
201                 log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage()))
202                 self.stats.completedAction('ping', start)
203                 self.table.replaceStaleNode(oldnode, newnode)
204             
205             def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()):
206                 """Got a pong from the old node, so update it."""
207                 self.stats.completedAction('ping', start)
208                 if dict['id'] == old.id:
209                     self.table.justSeenNode(old.id)
210             
211             # Bucket is full, check to see if old node is still available
212             self.stats.startedAction('ping')
213             df = old.ping(self.node.id)
214             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
215
216     def sendJoin(self, node, callback=None, errback=None):
217         """Join the DHT by pinging a bootstrap node.
218         
219         @type node: L{node.Node}
220         @param node: the node to send the join to
221         @type callback: C{method}
222         @param callback: the method to call with the results, it must take 1
223             parameter, the contact info returned by the node
224             (optional, defaults to doing nothing with the results)
225         @type errback: C{method}
226         @param errback: the method to call if an error occurs
227             (optional, defaults to calling the callback with None)
228         """
229
230         def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()):
231             """Node responded properly, callback with response."""
232             n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
233             self.stats.completedAction('join', start)
234             reactor.callLater(0, self.insertNode, n)
235             if callback:
236                 callback((dict['ip_addr'], dict['port']))
237
238         def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()):
239             """Error occurred, fail node and errback or callback with error."""
240             log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
241             self.stats.completedAction('join', start)
242             self.table.nodeFailed(node)
243             if errback:
244                 errback()
245             elif callback:
246                 callback(None)
247         
248         self.stats.startedAction('join')
249         df = node.join(self.node.id)
250         df.addCallbacks(_pongHandler, _defaultPong)
251
252     def findCloseNodes(self, callback=lambda a: None):
253         """Perform a findNode on the ID one away from our own.
254
255         This will allow us to populate our table with nodes on our network
256         closest to our own. This is called as soon as we start up with an
257         empty table.
258
259         @type callback: C{method}
260         @param callback: the method to call with the results, it must take 1
261             parameter, the list of K closest nodes
262             (optional, defaults to doing nothing with the results)
263         """
264         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
265         self.findNode(id, callback)
266
267     def refreshTable(self, force = False):
268         """Check all the buckets for those that need refreshing.
269         
270         @param force: refresh all buckets regardless of last bucket access time
271             (optional, defaults to False)
272         """
273         def callback(nodes):
274             pass
275     
276         for bucket in self.table.buckets:
277             if force or (datetime.now() - bucket.lastAccessed > 
278                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
279                 # Choose a random ID in the bucket and try and find it
280                 id = newIDInRange(bucket.min, bucket.max)
281                 self.findNode(id, callback)
282
283     def shutdown(self):
284         """Closes the port and cancels pending later calls."""
285         self.listenport.stopListening()
286         try:
287             self.next_checkpoint.cancel()
288         except:
289             pass
290         self.store.close()
291     
292     def getStats(self):
293         """Gather the statistics for the DHT."""
294         return self.stats.formatHTML()
295
296     #{ Remote interface
297     def krpc_ping(self, id, _krpc_sender = None):
298         """Pong with our ID.
299         
300         @type id: C{string}
301         @param id: the node ID of the sender node
302         @type _krpc_sender: (C{string}, C{int})
303         @param _krpc_sender: the sender node's IP address and port
304         """
305         if _krpc_sender is not None:
306             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
307             reactor.callLater(0, self.insertNode, n, False)
308
309         return {"id" : self.node.id}
310         
311     def krpc_join(self, id, _krpc_sender = None):
312         """Add the node by responding with its address and port.
313         
314         @type id: C{string}
315         @param id: the node ID of the sender node
316         @type _krpc_sender: (C{string}, C{int})
317         @param _krpc_sender: the sender node's IP address and port
318         """
319         if _krpc_sender is not None:
320             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
321             reactor.callLater(0, self.insertNode, n, False)
322         else:
323             _krpc_sender = ('127.0.0.1', self.port)
324
325         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
326         
327     def krpc_find_node(self, id, target, _krpc_sender = None):
328         """Find the K closest nodes to the target in the local routing table.
329         
330         @type target: C{string}
331         @param target: the target ID to find nodes for
332         @type id: C{string}
333         @param id: the node ID of the sender node
334         @type _krpc_sender: (C{string}, C{int})
335         @param _krpc_sender: the sender node's IP address and port
336         """
337         if _krpc_sender is not None:
338             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
339             reactor.callLater(0, self.insertNode, n, False)
340         else:
341             _krpc_sender = ('127.0.0.1', self.port)
342
343         nodes = self.table.findNodes(target)
344         nodes = map(lambda node: node.contactInfo(), nodes)
345         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
346         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
347
348
349 class KhashmirRead(KhashmirBase):
350     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
351
352     _Node = KNodeRead
353
354     #{ Local interface
355     def findValue(self, key, callback):
356         """Get the nodes that have values for the key from the global table.
357         
358         @type key: C{string}
359         @param key: the target key to find the values for
360         @type callback: C{method}
361         @param callback: the method to call with the results, it must take 1
362             parameter, the list of nodes with values
363         """
364         # Start with ourself
365         nodes = [copy(self.node)]
366         
367         # Search for others starting with the locally found ones
368         state = FindValue(self, key, callback, self.config, self.stats)
369         reactor.callLater(0, state.goWithNodes, nodes)
370
371     def valueForKey(self, key, callback, searchlocal = True):
372         """Get the values found for key in global table.
373         
374         Callback will be called with a list of values for each peer that
375         returns unique values. The final callback will be an empty list.
376
377         @type key: C{string}
378         @param key: the target key to get the values for
379         @type callback: C{method}
380         @param callback: the method to call with the results, it must take 2
381             parameters: the key, and the values found
382         @type searchlocal: C{boolean}
383         @param searchlocal: whether to also look for any local values
384         """
385
386         def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
387             """Use the found nodes to send requests for values to."""
388             # Get any local values
389             if searchlocal:
390                 l = self.store.retrieveValues(key)
391                 if len(l) > 0:
392                     node = copy(self.node)
393                     node.updateNumValues(len(l))
394                     nodes = nodes + [node]
395
396             state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
397             reactor.callLater(0, state.goWithNodes, nodes)
398             
399         # First lookup nodes that have values for the key
400         self.findValue(key, _getValueForKey)
401
402     #{ Remote interface
403     def krpc_find_value(self, id, key, _krpc_sender = None):
404         """Find the number of values stored locally for the key, and the K closest nodes.
405         
406         @type key: C{string}
407         @param key: the target key to find the values and nodes for
408         @type id: C{string}
409         @param id: the node ID of the sender node
410         @type _krpc_sender: (C{string}, C{int})
411         @param _krpc_sender: the sender node's IP address and port
412         """
413         if _krpc_sender is not None:
414             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
415             reactor.callLater(0, self.insertNode, n, False)
416     
417         nodes = self.table.findNodes(key)
418         nodes = map(lambda node: node.contactInfo(), nodes)
419         num_values = self.store.countValues(key)
420         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
421
422     def krpc_get_value(self, id, key, num, _krpc_sender = None):
423         """Retrieve the values stored locally for the key.
424         
425         @type key: C{string}
426         @param key: the target key to retrieve the values for
427         @type num: C{int}
428         @param num: the maximum number of values to retrieve, or 0 to
429             retrieve all of them
430         @type id: C{string}
431         @param id: the node ID of the sender node
432         @type _krpc_sender: (C{string}, C{int})
433         @param _krpc_sender: the sender node's IP address and port
434         """
435         if _krpc_sender is not None:
436             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
437             reactor.callLater(0, self.insertNode, n, False)
438     
439         l = self.store.retrieveValues(key)
440         if num == 0 or num >= len(l):
441             return {'values' : l, "id": self.node.id}
442         else:
443             shuffle(l)
444             return {'values' : l[:num], "id": self.node.id}
445
446
447 class KhashmirWrite(KhashmirRead):
448     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
449
450     _Node = KNodeWrite
451
452     #{ Local interface
453     def storeValueForKey(self, key, value, callback=None):
454         """Stores the value for the key in the global table.
455         
456         No status in this implementation, peers respond but don't indicate
457         status of storing values.
458
459         @type key: C{string}
460         @param key: the target key to store the value for
461         @type value: C{string}
462         @param value: the value to store with the key
463         @type callback: C{method}
464         @param callback: the method to call with the results, it must take 3
465             parameters: the key, the value stored, and the result of the store
466             (optional, defaults to doing nothing with the results)
467         """
468         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
469             """Use the returned K closest nodes to store the key at."""
470             if not response:
471                 def _storedValueHandler(key, value, sender):
472                     """Default callback that does nothing."""
473                     pass
474                 response = _storedValueHandler
475             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
476             reactor.callLater(0, action.goWithNodes, nodes)
477             
478         # First find the K closest nodes to operate on.
479         self.findNode(key, _storeValueForKey)
480                     
481     #{ Remote interface
482     def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
483         """Store the value locally with the key.
484         
485         @type key: C{string}
486         @param key: the target key to store the value for
487         @type value: C{string}
488         @param value: the value to store with the key
489         @param token: the token to confirm that this peer contacted us previously
490         @type id: C{string}
491         @param id: the node ID of the sender node
492         @type _krpc_sender: (C{string}, C{int})
493         @param _krpc_sender: the sender node's IP address and port
494         """
495         if _krpc_sender is not None:
496             n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
497             reactor.callLater(0, self.insertNode, n, False)
498         else:
499             _krpc_sender = ('127.0.0.1', self.port)
500
501         for secret in self.token_secrets:
502             this_token = sha(secret + _krpc_sender[0]).digest()
503             if token == this_token:
504                 self.store.storeValue(key, value)
505                 return {"id" : self.node.id}
506         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
507
508
509 class Khashmir(KhashmirWrite):
510     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
511     _Node = KNodeWrite
512
513
514 class SimpleTests(unittest.TestCase):
515     
516     timeout = 10
517     DHT_DEFAULTS = {'PORT': 9977,
518                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
519                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
520                     'MAX_FAILURES': 3,
521                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
522                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
523                     'KEY_EXPIRE': 3600, 'SPEW': False, }
524
525     def setUp(self):
526         d = self.DHT_DEFAULTS.copy()
527         d['PORT'] = 4044
528         self.a = Khashmir(d)
529         d = self.DHT_DEFAULTS.copy()
530         d['PORT'] = 4045
531         self.b = Khashmir(d)
532         
533     def tearDown(self):
534         self.a.shutdown()
535         self.b.shutdown()
536         os.unlink(self.a.store.db)
537         os.unlink(self.b.store.db)
538
539     def testAddContact(self):
540         self.failUnlessEqual(len(self.a.table.buckets), 1)
541         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
542
543         self.failUnlessEqual(len(self.b.table.buckets), 1)
544         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
545
546         self.a.addContact('127.0.0.1', 4045)
547         reactor.iterate()
548         reactor.iterate()
549         reactor.iterate()
550         reactor.iterate()
551
552         self.failUnlessEqual(len(self.a.table.buckets), 1)
553         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
554         self.failUnlessEqual(len(self.b.table.buckets), 1)
555         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
556
557     def testStoreRetrieve(self):
558         self.a.addContact('127.0.0.1', 4045)
559         reactor.iterate()
560         reactor.iterate()
561         reactor.iterate()
562         reactor.iterate()
563         self.got = 0
564         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
565         reactor.iterate()
566         reactor.iterate()
567         reactor.iterate()
568         reactor.iterate()
569         reactor.iterate()
570         reactor.iterate()
571         self.a.valueForKey(sha('foo').digest(), self._cb)
572         reactor.iterate()
573         reactor.iterate()
574         reactor.iterate()
575         reactor.iterate()
576         reactor.iterate()
577         reactor.iterate()
578         reactor.iterate()
579
580     def _cb(self, key, val):
581         if not val:
582             self.failUnlessEqual(self.got, 1)
583         elif 'foobar' in val:
584             self.got = 1
585
586
587 class MultiTest(unittest.TestCase):
588     
589     timeout = 30
590     num = 20
591     DHT_DEFAULTS = {'PORT': 9977,
592                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
593                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
594                     'MAX_FAILURES': 3,
595                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
596                     'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
597                     'KEY_EXPIRE': 3600, 'SPEW': False, }
598
599     def _done(self, val):
600         self.done = 1
601         
602     def setUp(self):
603         self.l = []
604         self.startport = 4088
605         for i in range(self.num):
606             d = self.DHT_DEFAULTS.copy()
607             d['PORT'] = self.startport + i
608             self.l.append(Khashmir(d))
609         reactor.iterate()
610         reactor.iterate()
611         
612         for i in self.l:
613             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
614             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
615             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
616             reactor.iterate()
617             reactor.iterate()
618             reactor.iterate() 
619             
620         for i in self.l:
621             self.done = 0
622             i.findCloseNodes(self._done)
623             while not self.done:
624                 reactor.iterate()
625         for i in self.l:
626             self.done = 0
627             i.findCloseNodes(self._done)
628             while not self.done:
629                 reactor.iterate()
630
631     def tearDown(self):
632         for i in self.l:
633             i.shutdown()
634             os.unlink(i.store.db)
635             
636         reactor.iterate()
637         
638     def testStoreRetrieve(self):
639         for i in range(10):
640             K = newID()
641             V = newID()
642             
643             for a in range(3):
644                 self.done = 0
645                 def _scb(key, value, result):
646                     self.done = 1
647                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
648                 while not self.done:
649                     reactor.iterate()
650
651
652                 def _rcb(key, val):
653                     if not val:
654                         self.done = 1
655                         self.failUnlessEqual(self.got, 1)
656                     elif V in val:
657                         self.got = 1
658                 for x in range(3):
659                     self.got = 0
660                     self.done = 0
661                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
662                     while not self.done:
663                         reactor.iterate()