Added the number of times each action was started to the DHT stats.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The main Khashmir program."""
5
6 import warnings
7 warnings.simplefilter("ignore", DeprecationWarning)
8
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
11 from sha import sha
12 import os
13
14 from twisted.internet.defer import Deferred
15 from twisted.internet import protocol, reactor
16 from twisted.trial import unittest
17
18 from db import DB
19 from ktable import KTable
20 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
21 from khash import newID, newIDInRange
22 from actions import FindNode, FindValue, GetValue, StoreValue
23 from stats import StatsLogger
24 import krpc
25
26 class KhashmirBase(protocol.Factory):
27     """The base Khashmir class, with base functionality and find node, no key-value mappings.
28     
29     @type _Node: L{node.Node}
30     @ivar _Node: the knode implementation to use for this class of DHT
31     @type config: C{dictionary}
32     @ivar config: the configuration parameters for the DHT
33     @type port: C{int}
34     @ivar port: the port to listen on
35     @type store: L{db.DB}
36     @ivar store: the database to store nodes and key/value pairs in
37     @type node: L{node.Node}
38     @ivar node: this node
39     @type table: L{ktable.KTable}
40     @ivar table: the routing table
41     @type token_secrets: C{list} of C{string}
42     @ivar token_secrets: the current secrets to use to create tokens
43     @type stats: L{stats.StatsLogger}
44     @ivar stats: the statistics gatherer
45     @type udp: L{krpc.hostbroker}
46     @ivar udp: the factory for the KRPC protocol
47     @type listenport: L{twisted.internet.interfaces.IListeningPort}
48     @ivar listenport: the UDP listening port
49     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
50     @ivar next_checkpoint: the delayed call for the next checkpoint
51     """
52     
53     _Node = KNodeBase
54     
55     def __init__(self, config, cache_dir='/tmp'):
56         """Initialize the Khashmir class and call the L{setup} method.
57         
58         @type config: C{dictionary}
59         @param config: the configuration parameters for the DHT
60         @type cache_dir: C{string}
61         @param cache_dir: the directory to store all files in
62             (optional, defaults to the /tmp directory)
63         """
64         self.config = None
65         self.setup(config, cache_dir)
66         
67     def setup(self, config, cache_dir):
68         """Setup all the Khashmir sub-modules.
69         
70         @type config: C{dictionary}
71         @param config: the configuration parameters for the DHT
72         @type cache_dir: C{string}
73         @param cache_dir: the directory to store all files in
74         """
75         self.config = config
76         self.port = config['PORT']
77         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
78         self.node = self._loadSelfNode('', self.port)
79         self.table = KTable(self.node, config)
80         self.token_secrets = [newID()]
81         self.stats = StatsLogger(self.table, self.store, self.config)
82         
83         # Start listening
84         self.udp = krpc.hostbroker(self, self.stats, config)
85         self.udp.protocol = krpc.KRPC
86         self.listenport = reactor.listenUDP(self.port, self.udp)
87         
88         # Load the routing table and begin checkpointing
89         self._loadRoutingTable()
90         self.refreshTable(force = True)
91         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
92
93     def Node(self, id, host = None, port = None):
94         """Create a new node.
95         
96         @see: L{node.Node.__init__}
97         """
98         n = self._Node(id, host, port)
99         n.table = self.table
100         n.conn = self.udp.connectionForAddr((n.host, n.port))
101         return n
102     
103     def __del__(self):
104         """Stop listening for packets."""
105         self.listenport.stopListening()
106         
107     def _loadSelfNode(self, host, port):
108         """Create this node, loading any previously saved one."""
109         id = self.store.getSelfNode()
110         if not id:
111             id = newID()
112         return self._Node(id, host, port)
113         
114     def checkpoint(self):
115         """Perform some periodic maintenance operations."""
116         # Create a new token secret
117         self.token_secrets.insert(0, newID())
118         if len(self.token_secrets) > 3:
119             self.token_secrets.pop()
120             
121         # Save some parameters for reloading
122         self.store.saveSelfNode(self.node.id)
123         self.store.dumpRoutingTable(self.table.buckets)
124         
125         # DHT maintenance
126         self.store.expireValues(self.config['KEY_EXPIRE'])
127         self.refreshTable()
128         
129         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
130                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
131                                                  self.checkpoint)
132         
133     def _loadRoutingTable(self):
134         """Load the previous routing table nodes from the database.
135         
136         It's usually a good idea to call refreshTable(force = True) after
137         loading the table.
138         """
139         nodes = self.store.getRoutingTable()
140         for rec in nodes:
141             n = self.Node(rec[0], rec[1], int(rec[2]))
142             self.table.insertNode(n, contacted = False)
143             
144     #{ Local interface
145     def addContact(self, host, port, callback=None, errback=None):
146         """Ping this node and add the contact info to the table on pong.
147         
148         @type host: C{string}
149         @param host: the IP address of the node to contact
150         @type port: C{int}
151         @param port:the port of the node to contact
152         @type callback: C{method}
153         @param callback: the method to call with the results, it must take 1
154             parameter, the contact info returned by the node
155             (optional, defaults to doing nothing with the results)
156         @type errback: C{method}
157         @param errback: the method to call if an error occurs
158             (optional, defaults to calling the callback with None)
159         """
160         n = self.Node(NULL_ID, host, port)
161         self.sendJoin(n, callback=callback, errback=errback)
162
163     def findNode(self, id, callback, errback=None):
164         """Find the contact info for the K closest nodes in the global table.
165         
166         @type id: C{string}
167         @param id: the target ID to find the K closest nodes of
168         @type callback: C{method}
169         @param callback: the method to call with the results, it must take 1
170             parameter, the list of K closest nodes
171         @type errback: C{method}
172         @param errback: the method to call if an error occurs
173             (optional, defaults to doing nothing when an error occurs)
174         """
175         # Get K nodes out of local table/cache
176         nodes = self.table.findNodes(id)
177         d = Deferred()
178         if errback:
179             d.addCallbacks(callback, errback)
180         else:
181             d.addCallback(callback)
182
183         # If the target ID was found
184         if len(nodes) == 1 and nodes[0].id == id:
185             d.callback(nodes)
186         else:
187             # Start the finding nodes action
188             state = FindNode(self, id, d.callback, self.config, self.stats)
189             reactor.callLater(0, state.goWithNodes, nodes)
190     
191     def insertNode(self, node, contacted = True):
192         """Try to insert a node in our local table, pinging oldest contact if necessary.
193         
194         If all you have is a host/port, then use L{addContact}, which calls this
195         method after receiving the PONG from the remote node. The reason for
196         the seperation is we can't insert a node into the table without its
197         node ID. That means of course the node passed into this method needs
198         to be a properly formed Node object with a valid ID.
199
200         @type node: L{node.Node}
201         @param node: the new node to try and insert
202         @type contacted: C{boolean}
203         @param contacted: whether the new node is known to be good, i.e.
204             responded to a request (optional, defaults to True)
205         """
206         old = self.table.insertNode(node, contacted=contacted)
207         if (old and old.id != self.node.id and
208             (datetime.now() - old.lastSeen) > 
209              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
210             
211             def _staleNodeHandler(oldnode = old, newnode = node):
212                 """The pinged node never responded, so replace it."""
213                 self.table.replaceStaleNode(oldnode, newnode)
214             
215             def _notStaleNodeHandler(dict, old=old):
216                 """Got a pong from the old node, so update it."""
217                 dict = dict['rsp']
218                 if dict['id'] == old.id:
219                     self.table.justSeenNode(old.id)
220             
221             # Bucket is full, check to see if old node is still available
222             self.stats.startedAction('ping')
223             df = old.ping(self.node.id)
224             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
225
226     def sendJoin(self, node, callback=None, errback=None):
227         """Join the DHT by pinging a bootstrap node.
228         
229         @type node: L{node.Node}
230         @param node: the node to send the join to
231         @type callback: C{method}
232         @param callback: the method to call with the results, it must take 1
233             parameter, the contact info returned by the node
234             (optional, defaults to doing nothing with the results)
235         @type errback: C{method}
236         @param errback: the method to call if an error occurs
237             (optional, defaults to calling the callback with None)
238         """
239
240         def _pongHandler(dict, node=node, self=self, callback=callback):
241             """Node responded properly, callback with response."""
242             n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
243             self.insertNode(n)
244             if callback:
245                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
246
247         def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
248             """Error occurred, fail node and errback or callback with error."""
249             table.nodeFailed(node)
250             if errback:
251                 errback()
252             elif callback:
253                 callback(None)
254         
255         self.stats.startedAction('join')
256         df = node.join(self.node.id)
257         df.addCallbacks(_pongHandler, _defaultPong)
258
259     def findCloseNodes(self, callback=lambda a: None, errback = None):
260         """Perform a findNode on the ID one away from our own.
261
262         This will allow us to populate our table with nodes on our network
263         closest to our own. This is called as soon as we start up with an
264         empty table.
265
266         @type callback: C{method}
267         @param callback: the method to call with the results, it must take 1
268             parameter, the list of K closest nodes
269             (optional, defaults to doing nothing with the results)
270         @type errback: C{method}
271         @param errback: the method to call if an error occurs
272             (optional, defaults to doing nothing when an error occurs)
273         """
274         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
275         self.findNode(id, callback, errback)
276
277     def refreshTable(self, force = False):
278         """Check all the buckets for those that need refreshing.
279         
280         @param force: refresh all buckets regardless of last bucket access time
281             (optional, defaults to False)
282         """
283         def callback(nodes):
284             pass
285     
286         for bucket in self.table.buckets:
287             if force or (datetime.now() - bucket.lastAccessed > 
288                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
289                 # Choose a random ID in the bucket and try and find it
290                 id = newIDInRange(bucket.min, bucket.max)
291                 self.findNode(id, callback)
292
293     def shutdown(self):
294         """Closes the port and cancels pending later calls."""
295         self.listenport.stopListening()
296         try:
297             self.next_checkpoint.cancel()
298         except:
299             pass
300         self.store.close()
301     
302     def getStats(self):
303         """Gather the statistics for the DHT."""
304         return self.stats.gather()
305
306     #{ Remote interface
307     def krpc_ping(self, id, _krpc_sender):
308         """Pong with our ID.
309         
310         @type id: C{string}
311         @param id: the node ID of the sender node
312         @type _krpc_sender: (C{string}, C{int})
313         @param _krpc_sender: the sender node's IP address and port
314         """
315         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
316         self.insertNode(n, contacted = False)
317
318         return {"id" : self.node.id}
319         
320     def krpc_join(self, id, _krpc_sender):
321         """Add the node by responding with its address and port.
322         
323         @type id: C{string}
324         @param id: the node ID of the sender node
325         @type _krpc_sender: (C{string}, C{int})
326         @param _krpc_sender: the sender node's IP address and port
327         """
328         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
329         self.insertNode(n, contacted = False)
330
331         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
332         
333     def krpc_find_node(self, target, id, _krpc_sender):
334         """Find the K closest nodes to the target in the local routing table.
335         
336         @type target: C{string}
337         @param target: the target ID to find nodes for
338         @type id: C{string}
339         @param id: the node ID of the sender node
340         @type _krpc_sender: (C{string}, C{int})
341         @param _krpc_sender: the sender node's IP address and port
342         """
343         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
344         self.insertNode(n, contacted = False)
345
346         nodes = self.table.findNodes(target)
347         nodes = map(lambda node: node.contactInfo(), nodes)
348         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
349         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
350
351
352 class KhashmirRead(KhashmirBase):
353     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
354
355     _Node = KNodeRead
356
357     #{ Local interface
358     def findValue(self, key, callback, errback=None):
359         """Get the nodes that have values for the key from the global table.
360         
361         @type key: C{string}
362         @param key: the target key to find the values for
363         @type callback: C{method}
364         @param callback: the method to call with the results, it must take 1
365             parameter, the list of nodes with values
366         @type errback: C{method}
367         @param errback: the method to call if an error occurs
368             (optional, defaults to doing nothing when an error occurs)
369         """
370         # Get K nodes out of local table/cache
371         nodes = self.table.findNodes(key)
372         d = Deferred()
373         if errback:
374             d.addCallbacks(callback, errback)
375         else:
376             d.addCallback(callback)
377
378         # Search for others starting with the locally found ones
379         state = FindValue(self, key, d.callback, self.config, self.stats)
380         reactor.callLater(0, state.goWithNodes, nodes)
381
382     def valueForKey(self, key, callback, searchlocal = True):
383         """Get the values found for key in global table.
384         
385         Callback will be called with a list of values for each peer that
386         returns unique values. The final callback will be an empty list.
387
388         @type key: C{string}
389         @param key: the target key to get the values for
390         @type callback: C{method}
391         @param callback: the method to call with the results, it must take 2
392             parameters: the key, and the values found
393         @type searchlocal: C{boolean}
394         @param searchlocal: whether to also look for any local values
395         """
396         # Get any local values
397         if searchlocal:
398             l = self.store.retrieveValues(key)
399             if len(l) > 0:
400                 reactor.callLater(0, callback, key, l)
401         else:
402             l = []
403
404         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
405             """Use the found nodes to send requests for values to."""
406             state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
407             reactor.callLater(0, state.goWithNodes, nodes)
408             
409         # First lookup nodes that have values for the key
410         self.findValue(key, _getValueForKey)
411
412     #{ Remote interface
413     def krpc_find_value(self, key, id, _krpc_sender):
414         """Find the number of values stored locally for the key, and the K closest nodes.
415         
416         @type key: C{string}
417         @param key: the target key to find the values and nodes for
418         @type id: C{string}
419         @param id: the node ID of the sender node
420         @type _krpc_sender: (C{string}, C{int})
421         @param _krpc_sender: the sender node's IP address and port
422         """
423         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
424         self.insertNode(n, contacted = False)
425     
426         nodes = self.table.findNodes(key)
427         nodes = map(lambda node: node.contactInfo(), nodes)
428         num_values = self.store.countValues(key)
429         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
430
431     def krpc_get_value(self, key, num, id, _krpc_sender):
432         """Retrieve the values stored locally for the key.
433         
434         @type key: C{string}
435         @param key: the target key to retrieve the values for
436         @type num: C{int}
437         @param num: the maximum number of values to retrieve, or 0 to
438             retrieve all of them
439         @type id: C{string}
440         @param id: the node ID of the sender node
441         @type _krpc_sender: (C{string}, C{int})
442         @param _krpc_sender: the sender node's IP address and port
443         """
444         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
445         self.insertNode(n, contacted = False)
446     
447         l = self.store.retrieveValues(key)
448         if num == 0 or num >= len(l):
449             return {'values' : l, "id": self.node.id}
450         else:
451             shuffle(l)
452             return {'values' : l[:num], "id": self.node.id}
453
454
455 class KhashmirWrite(KhashmirRead):
456     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
457
458     _Node = KNodeWrite
459
460     #{ Local interface
461     def storeValueForKey(self, key, value, callback=None):
462         """Stores the value for the key in the global table.
463         
464         No status in this implementation, peers respond but don't indicate
465         status of storing values.
466
467         @type key: C{string}
468         @param key: the target key to store the value for
469         @type value: C{string}
470         @param value: the value to store with the key
471         @type callback: C{method}
472         @param callback: the method to call with the results, it must take 3
473             parameters: the key, the value stored, and the result of the store
474             (optional, defaults to doing nothing with the results)
475         """
476         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
477             """Use the returned K closest nodes to store the key at."""
478             if not response:
479                 def _storedValueHandler(key, value, sender):
480                     """Default callback that does nothing."""
481                     pass
482                 response = _storedValueHandler
483             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
484             reactor.callLater(0, action.goWithNodes, nodes)
485             
486         # First find the K closest nodes to operate on.
487         self.findNode(key, _storeValueForKey)
488                     
489     #{ Remote interface
490     def krpc_store_value(self, key, value, token, id, _krpc_sender):
491         """Store the value locally with the key.
492         
493         @type key: C{string}
494         @param key: the target key to store the value for
495         @type value: C{string}
496         @param value: the value to store with the key
497         @param token: the token to confirm that this peer contacted us previously
498         @type id: C{string}
499         @param id: the node ID of the sender node
500         @type _krpc_sender: (C{string}, C{int})
501         @param _krpc_sender: the sender node's IP address and port
502         """
503         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
504         self.insertNode(n, contacted = False)
505         for secret in self.token_secrets:
506             this_token = sha(secret + _krpc_sender[0]).digest()
507             if token == this_token:
508                 self.store.storeValue(key, value)
509                 return {"id" : self.node.id}
510         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
511
512
513 class Khashmir(KhashmirWrite):
514     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
515     _Node = KNodeWrite
516
517
518 class SimpleTests(unittest.TestCase):
519     
520     timeout = 10
521     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
522                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
523                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
524                     'MAX_FAILURES': 3,
525                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
526                     'KEY_EXPIRE': 3600, 'SPEW': False, }
527
528     def setUp(self):
529         d = self.DHT_DEFAULTS.copy()
530         d['PORT'] = 4044
531         self.a = Khashmir(d)
532         d = self.DHT_DEFAULTS.copy()
533         d['PORT'] = 4045
534         self.b = Khashmir(d)
535         
536     def tearDown(self):
537         self.a.shutdown()
538         self.b.shutdown()
539         os.unlink(self.a.store.db)
540         os.unlink(self.b.store.db)
541
542     def testAddContact(self):
543         self.failUnlessEqual(len(self.a.table.buckets), 1)
544         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
545
546         self.failUnlessEqual(len(self.b.table.buckets), 1)
547         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
548
549         self.a.addContact('127.0.0.1', 4045)
550         reactor.iterate()
551         reactor.iterate()
552         reactor.iterate()
553         reactor.iterate()
554
555         self.failUnlessEqual(len(self.a.table.buckets), 1)
556         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
557         self.failUnlessEqual(len(self.b.table.buckets), 1)
558         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
559
560     def testStoreRetrieve(self):
561         self.a.addContact('127.0.0.1', 4045)
562         reactor.iterate()
563         reactor.iterate()
564         reactor.iterate()
565         reactor.iterate()
566         self.got = 0
567         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
568         reactor.iterate()
569         reactor.iterate()
570         reactor.iterate()
571         reactor.iterate()
572         reactor.iterate()
573         reactor.iterate()
574         self.a.valueForKey(sha('foo').digest(), self._cb)
575         reactor.iterate()
576         reactor.iterate()
577         reactor.iterate()
578         reactor.iterate()
579         reactor.iterate()
580         reactor.iterate()
581         reactor.iterate()
582
583     def _cb(self, key, val):
584         if not val:
585             self.failUnlessEqual(self.got, 1)
586         elif 'foobar' in val:
587             self.got = 1
588
589
590 class MultiTest(unittest.TestCase):
591     
592     timeout = 30
593     num = 20
594     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
595                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
596                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
597                     'MAX_FAILURES': 3,
598                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
599                     'KEY_EXPIRE': 3600, 'SPEW': False, }
600
601     def _done(self, val):
602         self.done = 1
603         
604     def setUp(self):
605         self.l = []
606         self.startport = 4088
607         for i in range(self.num):
608             d = self.DHT_DEFAULTS.copy()
609             d['PORT'] = self.startport + i
610             self.l.append(Khashmir(d))
611         reactor.iterate()
612         reactor.iterate()
613         
614         for i in self.l:
615             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
616             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
617             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
618             reactor.iterate()
619             reactor.iterate()
620             reactor.iterate() 
621             
622         for i in self.l:
623             self.done = 0
624             i.findCloseNodes(self._done)
625             while not self.done:
626                 reactor.iterate()
627         for i in self.l:
628             self.done = 0
629             i.findCloseNodes(self._done)
630             while not self.done:
631                 reactor.iterate()
632
633     def tearDown(self):
634         for i in self.l:
635             i.shutdown()
636             os.unlink(i.store.db)
637             
638         reactor.iterate()
639         
640     def testStoreRetrieve(self):
641         for i in range(10):
642             K = newID()
643             V = newID()
644             
645             for a in range(3):
646                 self.done = 0
647                 def _scb(key, value, result):
648                     self.done = 1
649                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
650                 while not self.done:
651                     reactor.iterate()
652
653
654                 def _rcb(key, val):
655                     if not val:
656                         self.done = 1
657                         self.failUnlessEqual(self.got, 1)
658                     elif V in val:
659                         self.got = 1
660                 for x in range(3):
661                     self.got = 0
662                     self.done = 0
663                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
664                     while not self.done:
665                         reactor.iterate()