9bd31402afb2f95c2f68d2e3d69788a3260575ab
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The main Khashmir program."""
5
6 import warnings
7 warnings.simplefilter("ignore", DeprecationWarning)
8
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
11 from sha import sha
12 import os
13
14 from twisted.internet.defer import Deferred
15 from twisted.internet import protocol, reactor
16 from twisted.trial import unittest
17
18 from db import DB
19 from ktable import KTable
20 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
21 from khash import newID, newIDInRange
22 from actions import FindNode, FindValue, GetValue, StoreValue
23 from stats import StatsLogger
24 import krpc
25
26 class KhashmirBase(protocol.Factory):
27     """The base Khashmir class, with base functionality and find node, no key-value mappings.
28     
29     @type _Node: L{node.Node}
30     @ivar _Node: the knode implementation to use for this class of DHT
31     @type config: C{dictionary}
32     @ivar config: the configuration parameters for the DHT
33     @type port: C{int}
34     @ivar port: the port to listen on
35     @type store: L{db.DB}
36     @ivar store: the database to store nodes and key/value pairs in
37     @type node: L{node.Node}
38     @ivar node: this node
39     @type table: L{ktable.KTable}
40     @ivar table: the routing table
41     @type token_secrets: C{list} of C{string}
42     @ivar token_secrets: the current secrets to use to create tokens
43     @type udp: L{krpc.hostbroker}
44     @ivar udp: the factory for the KRPC protocol
45     @type listenport: L{twisted.internet.interfaces.IListeningPort}
46     @ivar listenport: the UDP listening port
47     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
48     @ivar next_checkpoint: the delayed call for the next checkpoint
49     """
50     
51     _Node = KNodeBase
52     
53     def __init__(self, config, cache_dir='/tmp'):
54         """Initialize the Khashmir class and call the L{setup} method.
55         
56         @type config: C{dictionary}
57         @param config: the configuration parameters for the DHT
58         @type cache_dir: C{string}
59         @param cache_dir: the directory to store all files in
60             (optional, defaults to the /tmp directory)
61         """
62         self.config = None
63         self.setup(config, cache_dir)
64         
65     def setup(self, config, cache_dir):
66         """Setup all the Khashmir sub-modules.
67         
68         @type config: C{dictionary}
69         @param config: the configuration parameters for the DHT
70         @type cache_dir: C{string}
71         @param cache_dir: the directory to store all files in
72         """
73         self.config = config
74         self.port = config['PORT']
75         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
76         self.node = self._loadSelfNode('', self.port)
77         self.table = KTable(self.node, config)
78         self.token_secrets = [newID()]
79         self.stats = StatsLogger(self.table, self.store, self.config)
80         
81         # Start listening
82         self.udp = krpc.hostbroker(self, self.stats, config)
83         self.udp.protocol = krpc.KRPC
84         self.listenport = reactor.listenUDP(self.port, self.udp)
85         
86         # Load the routing table and begin checkpointing
87         self._loadRoutingTable()
88         self.refreshTable(force = True)
89         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
90
91     def Node(self, id, host = None, port = None):
92         """Create a new node.
93         
94         @see: L{node.Node.__init__}
95         """
96         n = self._Node(id, host, port)
97         n.table = self.table
98         n.conn = self.udp.connectionForAddr((n.host, n.port))
99         return n
100     
101     def __del__(self):
102         """Stop listening for packets."""
103         self.listenport.stopListening()
104         
105     def _loadSelfNode(self, host, port):
106         """Create this node, loading any previously saved one."""
107         id = self.store.getSelfNode()
108         if not id:
109             id = newID()
110         return self._Node(id, host, port)
111         
112     def checkpoint(self):
113         """Perform some periodic maintenance operations."""
114         # Create a new token secret
115         self.token_secrets.insert(0, newID())
116         if len(self.token_secrets) > 3:
117             self.token_secrets.pop()
118             
119         # Save some parameters for reloading
120         self.store.saveSelfNode(self.node.id)
121         self.store.dumpRoutingTable(self.table.buckets)
122         
123         # DHT maintenance
124         self.store.expireValues(self.config['KEY_EXPIRE'])
125         self.refreshTable()
126         
127         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
128                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
129                                                  self.checkpoint)
130         
131     def _loadRoutingTable(self):
132         """Load the previous routing table nodes from the database.
133         
134         It's usually a good idea to call refreshTable(force = True) after
135         loading the table.
136         """
137         nodes = self.store.getRoutingTable()
138         for rec in nodes:
139             n = self.Node(rec[0], rec[1], int(rec[2]))
140             self.table.insertNode(n, contacted = False)
141             
142     #{ Local interface
143     def addContact(self, host, port, callback=None, errback=None):
144         """Ping this node and add the contact info to the table on pong.
145         
146         @type host: C{string}
147         @param host: the IP address of the node to contact
148         @type port: C{int}
149         @param port:the port of the node to contact
150         @type callback: C{method}
151         @param callback: the method to call with the results, it must take 1
152             parameter, the contact info returned by the node
153             (optional, defaults to doing nothing with the results)
154         @type errback: C{method}
155         @param errback: the method to call if an error occurs
156             (optional, defaults to calling the callback with None)
157         """
158         n = self.Node(NULL_ID, host, port)
159         self.sendJoin(n, callback=callback, errback=errback)
160
161     def findNode(self, id, callback, errback=None):
162         """Find the contact info for the K closest nodes in the global table.
163         
164         @type id: C{string}
165         @param id: the target ID to find the K closest nodes of
166         @type callback: C{method}
167         @param callback: the method to call with the results, it must take 1
168             parameter, the list of K closest nodes
169         @type errback: C{method}
170         @param errback: the method to call if an error occurs
171             (optional, defaults to doing nothing when an error occurs)
172         """
173         # Get K nodes out of local table/cache
174         nodes = self.table.findNodes(id)
175         d = Deferred()
176         if errback:
177             d.addCallbacks(callback, errback)
178         else:
179             d.addCallback(callback)
180
181         # If the target ID was found
182         if len(nodes) == 1 and nodes[0].id == id:
183             d.callback(nodes)
184         else:
185             # Start the finding nodes action
186             state = FindNode(self, id, d.callback, self.config)
187             reactor.callLater(0, state.goWithNodes, nodes)
188     
189     def insertNode(self, node, contacted = True):
190         """Try to insert a node in our local table, pinging oldest contact if necessary.
191         
192         If all you have is a host/port, then use L{addContact}, which calls this
193         method after receiving the PONG from the remote node. The reason for
194         the seperation is we can't insert a node into the table without its
195         node ID. That means of course the node passed into this method needs
196         to be a properly formed Node object with a valid ID.
197
198         @type node: L{node.Node}
199         @param node: the new node to try and insert
200         @type contacted: C{boolean}
201         @param contacted: whether the new node is known to be good, i.e.
202             responded to a request (optional, defaults to True)
203         """
204         old = self.table.insertNode(node, contacted=contacted)
205         if (old and old.id != self.node.id and
206             (datetime.now() - old.lastSeen) > 
207              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
208             
209             def _staleNodeHandler(oldnode = old, newnode = node):
210                 """The pinged node never responded, so replace it."""
211                 self.table.replaceStaleNode(oldnode, newnode)
212             
213             def _notStaleNodeHandler(dict, old=old):
214                 """Got a pong from the old node, so update it."""
215                 dict = dict['rsp']
216                 if dict['id'] == old.id:
217                     self.table.justSeenNode(old.id)
218             
219             # Bucket is full, check to see if old node is still available
220             df = old.ping(self.node.id)
221             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
222
223     def sendJoin(self, node, callback=None, errback=None):
224         """Join the DHT by pinging a bootstrap node.
225         
226         @type node: L{node.Node}
227         @param node: the node to send the join to
228         @type callback: C{method}
229         @param callback: the method to call with the results, it must take 1
230             parameter, the contact info returned by the node
231             (optional, defaults to doing nothing with the results)
232         @type errback: C{method}
233         @param errback: the method to call if an error occurs
234             (optional, defaults to calling the callback with None)
235         """
236
237         def _pongHandler(dict, node=node, self=self, callback=callback):
238             """Node responded properly, callback with response."""
239             n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
240             self.insertNode(n)
241             if callback:
242                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
243
244         def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
245             """Error occurred, fail node and errback or callback with error."""
246             table.nodeFailed(node)
247             if errback:
248                 errback()
249             elif callback:
250                 callback(None)
251         
252         df = node.join(self.node.id)
253         df.addCallbacks(_pongHandler, _defaultPong)
254
255     def findCloseNodes(self, callback=lambda a: None, errback = None):
256         """Perform a findNode on the ID one away from our own.
257
258         This will allow us to populate our table with nodes on our network
259         closest to our own. This is called as soon as we start up with an
260         empty table.
261
262         @type callback: C{method}
263         @param callback: the method to call with the results, it must take 1
264             parameter, the list of K closest nodes
265             (optional, defaults to doing nothing with the results)
266         @type errback: C{method}
267         @param errback: the method to call if an error occurs
268             (optional, defaults to doing nothing when an error occurs)
269         """
270         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
271         self.findNode(id, callback, errback)
272
273     def refreshTable(self, force = False):
274         """Check all the buckets for those that need refreshing.
275         
276         @param force: refresh all buckets regardless of last bucket access time
277             (optional, defaults to False)
278         """
279         def callback(nodes):
280             pass
281     
282         for bucket in self.table.buckets:
283             if force or (datetime.now() - bucket.lastAccessed > 
284                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
285                 # Choose a random ID in the bucket and try and find it
286                 id = newIDInRange(bucket.min, bucket.max)
287                 self.findNode(id, callback)
288
289     def shutdown(self):
290         """Closes the port and cancels pending later calls."""
291         self.listenport.stopListening()
292         try:
293             self.next_checkpoint.cancel()
294         except:
295             pass
296         self.store.close()
297
298     #{ Remote interface
299     def krpc_ping(self, id, _krpc_sender):
300         """Pong with our ID.
301         
302         @type id: C{string}
303         @param id: the node ID of the sender node
304         @type _krpc_sender: (C{string}, C{int})
305         @param _krpc_sender: the sender node's IP address and port
306         """
307         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
308         self.insertNode(n, contacted = False)
309
310         return {"id" : self.node.id}
311         
312     def krpc_join(self, id, _krpc_sender):
313         """Add the node by responding with its address and port.
314         
315         @type id: C{string}
316         @param id: the node ID of the sender node
317         @type _krpc_sender: (C{string}, C{int})
318         @param _krpc_sender: the sender node's IP address and port
319         """
320         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
321         self.insertNode(n, contacted = False)
322
323         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
324         
325     def krpc_find_node(self, target, id, _krpc_sender):
326         """Find the K closest nodes to the target in the local routing table.
327         
328         @type target: C{string}
329         @param target: the target ID to find nodes for
330         @type id: C{string}
331         @param id: the node ID of the sender node
332         @type _krpc_sender: (C{string}, C{int})
333         @param _krpc_sender: the sender node's IP address and port
334         """
335         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
336         self.insertNode(n, contacted = False)
337
338         nodes = self.table.findNodes(target)
339         nodes = map(lambda node: node.contactInfo(), nodes)
340         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
341         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
342
343
344 class KhashmirRead(KhashmirBase):
345     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
346
347     _Node = KNodeRead
348
349     #{ Local interface
350     def findValue(self, key, callback, errback=None):
351         """Get the nodes that have values for the key from the global table.
352         
353         @type key: C{string}
354         @param key: the target key to find the values for
355         @type callback: C{method}
356         @param callback: the method to call with the results, it must take 1
357             parameter, the list of nodes with values
358         @type errback: C{method}
359         @param errback: the method to call if an error occurs
360             (optional, defaults to doing nothing when an error occurs)
361         """
362         # Get K nodes out of local table/cache
363         nodes = self.table.findNodes(key)
364         d = Deferred()
365         if errback:
366             d.addCallbacks(callback, errback)
367         else:
368             d.addCallback(callback)
369
370         # Search for others starting with the locally found ones
371         state = FindValue(self, key, d.callback, self.config)
372         reactor.callLater(0, state.goWithNodes, nodes)
373
374     def valueForKey(self, key, callback, searchlocal = True):
375         """Get the values found for key in global table.
376         
377         Callback will be called with a list of values for each peer that
378         returns unique values. The final callback will be an empty list.
379
380         @type key: C{string}
381         @param key: the target key to get the values for
382         @type callback: C{method}
383         @param callback: the method to call with the results, it must take 2
384             parameters: the key, and the values found
385         @type searchlocal: C{boolean}
386         @param searchlocal: whether to also look for any local values
387         """
388         # Get any local values
389         if searchlocal:
390             l = self.store.retrieveValues(key)
391             if len(l) > 0:
392                 reactor.callLater(0, callback, key, l)
393         else:
394             l = []
395
396         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
397             """Use the found nodes to send requests for values to."""
398             state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
399             reactor.callLater(0, state.goWithNodes, nodes)
400             
401         # First lookup nodes that have values for the key
402         self.findValue(key, _getValueForKey)
403
404     #{ Remote interface
405     def krpc_find_value(self, key, id, _krpc_sender):
406         """Find the number of values stored locally for the key, and the K closest nodes.
407         
408         @type key: C{string}
409         @param key: the target key to find the values and nodes for
410         @type id: C{string}
411         @param id: the node ID of the sender node
412         @type _krpc_sender: (C{string}, C{int})
413         @param _krpc_sender: the sender node's IP address and port
414         """
415         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
416         self.insertNode(n, contacted = False)
417     
418         nodes = self.table.findNodes(key)
419         nodes = map(lambda node: node.contactInfo(), nodes)
420         num_values = self.store.countValues(key)
421         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
422
423     def krpc_get_value(self, key, num, id, _krpc_sender):
424         """Retrieve the values stored locally for the key.
425         
426         @type key: C{string}
427         @param key: the target key to retrieve the values for
428         @type num: C{int}
429         @param num: the maximum number of values to retrieve, or 0 to
430             retrieve all of them
431         @type id: C{string}
432         @param id: the node ID of the sender node
433         @type _krpc_sender: (C{string}, C{int})
434         @param _krpc_sender: the sender node's IP address and port
435         """
436         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
437         self.insertNode(n, contacted = False)
438     
439         l = self.store.retrieveValues(key)
440         if num == 0 or num >= len(l):
441             return {'values' : l, "id": self.node.id}
442         else:
443             shuffle(l)
444             return {'values' : l[:num], "id": self.node.id}
445
446
447 class KhashmirWrite(KhashmirRead):
448     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
449
450     _Node = KNodeWrite
451
452     #{ Local interface
453     def storeValueForKey(self, key, value, callback=None):
454         """Stores the value for the key in the global table.
455         
456         No status in this implementation, peers respond but don't indicate
457         status of storing values.
458
459         @type key: C{string}
460         @param key: the target key to store the value for
461         @type value: C{string}
462         @param value: the value to store with the key
463         @type callback: C{method}
464         @param callback: the method to call with the results, it must take 3
465             parameters: the key, the value stored, and the result of the store
466             (optional, defaults to doing nothing with the results)
467         """
468         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
469             """Use the returned K closest nodes to store the key at."""
470             if not response:
471                 def _storedValueHandler(key, value, sender):
472                     """Default callback that does nothing."""
473                     pass
474                 response = _storedValueHandler
475             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
476             reactor.callLater(0, action.goWithNodes, nodes)
477             
478         # First find the K closest nodes to operate on.
479         self.findNode(key, _storeValueForKey)
480                     
481     #{ Remote interface
482     def krpc_store_value(self, key, value, token, id, _krpc_sender):
483         """Store the value locally with the key.
484         
485         @type key: C{string}
486         @param key: the target key to store the value for
487         @type value: C{string}
488         @param value: the value to store with the key
489         @param token: the token to confirm that this peer contacted us previously
490         @type id: C{string}
491         @param id: the node ID of the sender node
492         @type _krpc_sender: (C{string}, C{int})
493         @param _krpc_sender: the sender node's IP address and port
494         """
495         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
496         self.insertNode(n, contacted = False)
497         for secret in self.token_secrets:
498             this_token = sha(secret + _krpc_sender[0]).digest()
499             if token == this_token:
500                 self.store.storeValue(key, value)
501                 return {"id" : self.node.id}
502         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
503
504
505 class Khashmir(KhashmirWrite):
506     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
507     _Node = KNodeWrite
508
509
510 class SimpleTests(unittest.TestCase):
511     
512     timeout = 10
513     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
514                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
515                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
516                     'MAX_FAILURES': 3,
517                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
518                     'KEY_EXPIRE': 3600, 'SPEW': False, }
519
520     def setUp(self):
521         d = self.DHT_DEFAULTS.copy()
522         d['PORT'] = 4044
523         self.a = Khashmir(d)
524         d = self.DHT_DEFAULTS.copy()
525         d['PORT'] = 4045
526         self.b = Khashmir(d)
527         
528     def tearDown(self):
529         self.a.shutdown()
530         self.b.shutdown()
531         os.unlink(self.a.store.db)
532         os.unlink(self.b.store.db)
533
534     def testAddContact(self):
535         self.failUnlessEqual(len(self.a.table.buckets), 1)
536         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
537
538         self.failUnlessEqual(len(self.b.table.buckets), 1)
539         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
540
541         self.a.addContact('127.0.0.1', 4045)
542         reactor.iterate()
543         reactor.iterate()
544         reactor.iterate()
545         reactor.iterate()
546
547         self.failUnlessEqual(len(self.a.table.buckets), 1)
548         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
549         self.failUnlessEqual(len(self.b.table.buckets), 1)
550         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
551
552     def testStoreRetrieve(self):
553         self.a.addContact('127.0.0.1', 4045)
554         reactor.iterate()
555         reactor.iterate()
556         reactor.iterate()
557         reactor.iterate()
558         self.got = 0
559         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
560         reactor.iterate()
561         reactor.iterate()
562         reactor.iterate()
563         reactor.iterate()
564         reactor.iterate()
565         reactor.iterate()
566         self.a.valueForKey(sha('foo').digest(), self._cb)
567         reactor.iterate()
568         reactor.iterate()
569         reactor.iterate()
570         reactor.iterate()
571         reactor.iterate()
572         reactor.iterate()
573         reactor.iterate()
574
575     def _cb(self, key, val):
576         if not val:
577             self.failUnlessEqual(self.got, 1)
578         elif 'foobar' in val:
579             self.got = 1
580
581
582 class MultiTest(unittest.TestCase):
583     
584     timeout = 30
585     num = 20
586     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
587                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
588                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
589                     'MAX_FAILURES': 3,
590                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
591                     'KEY_EXPIRE': 3600, 'SPEW': False, }
592
593     def _done(self, val):
594         self.done = 1
595         
596     def setUp(self):
597         self.l = []
598         self.startport = 4088
599         for i in range(self.num):
600             d = self.DHT_DEFAULTS.copy()
601             d['PORT'] = self.startport + i
602             self.l.append(Khashmir(d))
603         reactor.iterate()
604         reactor.iterate()
605         
606         for i in self.l:
607             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
608             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
609             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
610             reactor.iterate()
611             reactor.iterate()
612             reactor.iterate() 
613             
614         for i in self.l:
615             self.done = 0
616             i.findCloseNodes(self._done)
617             while not self.done:
618                 reactor.iterate()
619         for i in self.l:
620             self.done = 0
621             i.findCloseNodes(self._done)
622             while not self.done:
623                 reactor.iterate()
624
625     def tearDown(self):
626         for i in self.l:
627             i.shutdown()
628             os.unlink(i.store.db)
629             
630         reactor.iterate()
631         
632     def testStoreRetrieve(self):
633         for i in range(10):
634             K = newID()
635             V = newID()
636             
637             for a in range(3):
638                 self.done = 0
639                 def _scb(key, value, result):
640                     self.done = 1
641                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
642                 while not self.done:
643                     reactor.iterate()
644
645
646                 def _rcb(key, val):
647                     if not val:
648                         self.done = 1
649                         self.failUnlessEqual(self.got, 1)
650                     elif V in val:
651                         self.got = 1
652                 for x in range(3):
653                     self.got = 0
654                     self.done = 0
655                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
656                     while not self.done:
657                         reactor.iterate()