Display DHT statistics to the HTTP user.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / khashmir.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The main Khashmir program."""
5
6 import warnings
7 warnings.simplefilter("ignore", DeprecationWarning)
8
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
11 from sha import sha
12 import os
13
14 from twisted.internet.defer import Deferred
15 from twisted.internet import protocol, reactor
16 from twisted.trial import unittest
17
18 from db import DB
19 from ktable import KTable
20 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
21 from khash import newID, newIDInRange
22 from actions import FindNode, FindValue, GetValue, StoreValue
23 from stats import StatsLogger
24 import krpc
25
26 class KhashmirBase(protocol.Factory):
27     """The base Khashmir class, with base functionality and find node, no key-value mappings.
28     
29     @type _Node: L{node.Node}
30     @ivar _Node: the knode implementation to use for this class of DHT
31     @type config: C{dictionary}
32     @ivar config: the configuration parameters for the DHT
33     @type port: C{int}
34     @ivar port: the port to listen on
35     @type store: L{db.DB}
36     @ivar store: the database to store nodes and key/value pairs in
37     @type node: L{node.Node}
38     @ivar node: this node
39     @type table: L{ktable.KTable}
40     @ivar table: the routing table
41     @type token_secrets: C{list} of C{string}
42     @ivar token_secrets: the current secrets to use to create tokens
43     @type stats: L{stats.StatsLogger}
44     @ivar stats: the statistics gatherer
45     @type udp: L{krpc.hostbroker}
46     @ivar udp: the factory for the KRPC protocol
47     @type listenport: L{twisted.internet.interfaces.IListeningPort}
48     @ivar listenport: the UDP listening port
49     @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
50     @ivar next_checkpoint: the delayed call for the next checkpoint
51     """
52     
53     _Node = KNodeBase
54     
55     def __init__(self, config, cache_dir='/tmp'):
56         """Initialize the Khashmir class and call the L{setup} method.
57         
58         @type config: C{dictionary}
59         @param config: the configuration parameters for the DHT
60         @type cache_dir: C{string}
61         @param cache_dir: the directory to store all files in
62             (optional, defaults to the /tmp directory)
63         """
64         self.config = None
65         self.setup(config, cache_dir)
66         
67     def setup(self, config, cache_dir):
68         """Setup all the Khashmir sub-modules.
69         
70         @type config: C{dictionary}
71         @param config: the configuration parameters for the DHT
72         @type cache_dir: C{string}
73         @param cache_dir: the directory to store all files in
74         """
75         self.config = config
76         self.port = config['PORT']
77         self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
78         self.node = self._loadSelfNode('', self.port)
79         self.table = KTable(self.node, config)
80         self.token_secrets = [newID()]
81         self.stats = StatsLogger(self.table, self.store, self.config)
82         
83         # Start listening
84         self.udp = krpc.hostbroker(self, self.stats, config)
85         self.udp.protocol = krpc.KRPC
86         self.listenport = reactor.listenUDP(self.port, self.udp)
87         
88         # Load the routing table and begin checkpointing
89         self._loadRoutingTable()
90         self.refreshTable(force = True)
91         self.next_checkpoint = reactor.callLater(60, self.checkpoint)
92
93     def Node(self, id, host = None, port = None):
94         """Create a new node.
95         
96         @see: L{node.Node.__init__}
97         """
98         n = self._Node(id, host, port)
99         n.table = self.table
100         n.conn = self.udp.connectionForAddr((n.host, n.port))
101         return n
102     
103     def __del__(self):
104         """Stop listening for packets."""
105         self.listenport.stopListening()
106         
107     def _loadSelfNode(self, host, port):
108         """Create this node, loading any previously saved one."""
109         id = self.store.getSelfNode()
110         if not id:
111             id = newID()
112         return self._Node(id, host, port)
113         
114     def checkpoint(self):
115         """Perform some periodic maintenance operations."""
116         # Create a new token secret
117         self.token_secrets.insert(0, newID())
118         if len(self.token_secrets) > 3:
119             self.token_secrets.pop()
120             
121         # Save some parameters for reloading
122         self.store.saveSelfNode(self.node.id)
123         self.store.dumpRoutingTable(self.table.buckets)
124         
125         # DHT maintenance
126         self.store.expireValues(self.config['KEY_EXPIRE'])
127         self.refreshTable()
128         
129         self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), 
130                                                            int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), 
131                                                  self.checkpoint)
132         
133     def _loadRoutingTable(self):
134         """Load the previous routing table nodes from the database.
135         
136         It's usually a good idea to call refreshTable(force = True) after
137         loading the table.
138         """
139         nodes = self.store.getRoutingTable()
140         for rec in nodes:
141             n = self.Node(rec[0], rec[1], int(rec[2]))
142             self.table.insertNode(n, contacted = False)
143             
144     #{ Local interface
145     def addContact(self, host, port, callback=None, errback=None):
146         """Ping this node and add the contact info to the table on pong.
147         
148         @type host: C{string}
149         @param host: the IP address of the node to contact
150         @type port: C{int}
151         @param port:the port of the node to contact
152         @type callback: C{method}
153         @param callback: the method to call with the results, it must take 1
154             parameter, the contact info returned by the node
155             (optional, defaults to doing nothing with the results)
156         @type errback: C{method}
157         @param errback: the method to call if an error occurs
158             (optional, defaults to calling the callback with None)
159         """
160         n = self.Node(NULL_ID, host, port)
161         self.sendJoin(n, callback=callback, errback=errback)
162
163     def findNode(self, id, callback, errback=None):
164         """Find the contact info for the K closest nodes in the global table.
165         
166         @type id: C{string}
167         @param id: the target ID to find the K closest nodes of
168         @type callback: C{method}
169         @param callback: the method to call with the results, it must take 1
170             parameter, the list of K closest nodes
171         @type errback: C{method}
172         @param errback: the method to call if an error occurs
173             (optional, defaults to doing nothing when an error occurs)
174         """
175         # Get K nodes out of local table/cache
176         nodes = self.table.findNodes(id)
177         d = Deferred()
178         if errback:
179             d.addCallbacks(callback, errback)
180         else:
181             d.addCallback(callback)
182
183         # If the target ID was found
184         if len(nodes) == 1 and nodes[0].id == id:
185             d.callback(nodes)
186         else:
187             # Start the finding nodes action
188             state = FindNode(self, id, d.callback, self.config)
189             reactor.callLater(0, state.goWithNodes, nodes)
190     
191     def insertNode(self, node, contacted = True):
192         """Try to insert a node in our local table, pinging oldest contact if necessary.
193         
194         If all you have is a host/port, then use L{addContact}, which calls this
195         method after receiving the PONG from the remote node. The reason for
196         the seperation is we can't insert a node into the table without its
197         node ID. That means of course the node passed into this method needs
198         to be a properly formed Node object with a valid ID.
199
200         @type node: L{node.Node}
201         @param node: the new node to try and insert
202         @type contacted: C{boolean}
203         @param contacted: whether the new node is known to be good, i.e.
204             responded to a request (optional, defaults to True)
205         """
206         old = self.table.insertNode(node, contacted=contacted)
207         if (old and old.id != self.node.id and
208             (datetime.now() - old.lastSeen) > 
209              timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
210             
211             def _staleNodeHandler(oldnode = old, newnode = node):
212                 """The pinged node never responded, so replace it."""
213                 self.table.replaceStaleNode(oldnode, newnode)
214             
215             def _notStaleNodeHandler(dict, old=old):
216                 """Got a pong from the old node, so update it."""
217                 dict = dict['rsp']
218                 if dict['id'] == old.id:
219                     self.table.justSeenNode(old.id)
220             
221             # Bucket is full, check to see if old node is still available
222             df = old.ping(self.node.id)
223             df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
224
225     def sendJoin(self, node, callback=None, errback=None):
226         """Join the DHT by pinging a bootstrap node.
227         
228         @type node: L{node.Node}
229         @param node: the node to send the join to
230         @type callback: C{method}
231         @param callback: the method to call with the results, it must take 1
232             parameter, the contact info returned by the node
233             (optional, defaults to doing nothing with the results)
234         @type errback: C{method}
235         @param errback: the method to call if an error occurs
236             (optional, defaults to calling the callback with None)
237         """
238
239         def _pongHandler(dict, node=node, self=self, callback=callback):
240             """Node responded properly, callback with response."""
241             n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
242             self.insertNode(n)
243             if callback:
244                 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
245
246         def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
247             """Error occurred, fail node and errback or callback with error."""
248             table.nodeFailed(node)
249             if errback:
250                 errback()
251             elif callback:
252                 callback(None)
253         
254         df = node.join(self.node.id)
255         df.addCallbacks(_pongHandler, _defaultPong)
256
257     def findCloseNodes(self, callback=lambda a: None, errback = None):
258         """Perform a findNode on the ID one away from our own.
259
260         This will allow us to populate our table with nodes on our network
261         closest to our own. This is called as soon as we start up with an
262         empty table.
263
264         @type callback: C{method}
265         @param callback: the method to call with the results, it must take 1
266             parameter, the list of K closest nodes
267             (optional, defaults to doing nothing with the results)
268         @type errback: C{method}
269         @param errback: the method to call if an error occurs
270             (optional, defaults to doing nothing when an error occurs)
271         """
272         id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
273         self.findNode(id, callback, errback)
274
275     def refreshTable(self, force = False):
276         """Check all the buckets for those that need refreshing.
277         
278         @param force: refresh all buckets regardless of last bucket access time
279             (optional, defaults to False)
280         """
281         def callback(nodes):
282             pass
283     
284         for bucket in self.table.buckets:
285             if force or (datetime.now() - bucket.lastAccessed > 
286                          timedelta(seconds=self.config['BUCKET_STALENESS'])):
287                 # Choose a random ID in the bucket and try and find it
288                 id = newIDInRange(bucket.min, bucket.max)
289                 self.findNode(id, callback)
290
291     def shutdown(self):
292         """Closes the port and cancels pending later calls."""
293         self.listenport.stopListening()
294         try:
295             self.next_checkpoint.cancel()
296         except:
297             pass
298         self.store.close()
299     
300     def getStats(self):
301         """Gather the statistics for the DHT."""
302         return self.stats.gather()
303
304     #{ Remote interface
305     def krpc_ping(self, id, _krpc_sender):
306         """Pong with our ID.
307         
308         @type id: C{string}
309         @param id: the node ID of the sender node
310         @type _krpc_sender: (C{string}, C{int})
311         @param _krpc_sender: the sender node's IP address and port
312         """
313         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
314         self.insertNode(n, contacted = False)
315
316         return {"id" : self.node.id}
317         
318     def krpc_join(self, id, _krpc_sender):
319         """Add the node by responding with its address and port.
320         
321         @type id: C{string}
322         @param id: the node ID of the sender node
323         @type _krpc_sender: (C{string}, C{int})
324         @param _krpc_sender: the sender node's IP address and port
325         """
326         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
327         self.insertNode(n, contacted = False)
328
329         return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
330         
331     def krpc_find_node(self, target, id, _krpc_sender):
332         """Find the K closest nodes to the target in the local routing table.
333         
334         @type target: C{string}
335         @param target: the target ID to find nodes for
336         @type id: C{string}
337         @param id: the node ID of the sender node
338         @type _krpc_sender: (C{string}, C{int})
339         @param _krpc_sender: the sender node's IP address and port
340         """
341         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
342         self.insertNode(n, contacted = False)
343
344         nodes = self.table.findNodes(target)
345         nodes = map(lambda node: node.contactInfo(), nodes)
346         token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
347         return {"nodes" : nodes, "token" : token, "id" : self.node.id}
348
349
350 class KhashmirRead(KhashmirBase):
351     """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
352
353     _Node = KNodeRead
354
355     #{ Local interface
356     def findValue(self, key, callback, errback=None):
357         """Get the nodes that have values for the key from the global table.
358         
359         @type key: C{string}
360         @param key: the target key to find the values for
361         @type callback: C{method}
362         @param callback: the method to call with the results, it must take 1
363             parameter, the list of nodes with values
364         @type errback: C{method}
365         @param errback: the method to call if an error occurs
366             (optional, defaults to doing nothing when an error occurs)
367         """
368         # Get K nodes out of local table/cache
369         nodes = self.table.findNodes(key)
370         d = Deferred()
371         if errback:
372             d.addCallbacks(callback, errback)
373         else:
374             d.addCallback(callback)
375
376         # Search for others starting with the locally found ones
377         state = FindValue(self, key, d.callback, self.config)
378         reactor.callLater(0, state.goWithNodes, nodes)
379
380     def valueForKey(self, key, callback, searchlocal = True):
381         """Get the values found for key in global table.
382         
383         Callback will be called with a list of values for each peer that
384         returns unique values. The final callback will be an empty list.
385
386         @type key: C{string}
387         @param key: the target key to get the values for
388         @type callback: C{method}
389         @param callback: the method to call with the results, it must take 2
390             parameters: the key, and the values found
391         @type searchlocal: C{boolean}
392         @param searchlocal: whether to also look for any local values
393         """
394         # Get any local values
395         if searchlocal:
396             l = self.store.retrieveValues(key)
397             if len(l) > 0:
398                 reactor.callLater(0, callback, key, l)
399         else:
400             l = []
401
402         def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
403             """Use the found nodes to send requests for values to."""
404             state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
405             reactor.callLater(0, state.goWithNodes, nodes)
406             
407         # First lookup nodes that have values for the key
408         self.findValue(key, _getValueForKey)
409
410     #{ Remote interface
411     def krpc_find_value(self, key, id, _krpc_sender):
412         """Find the number of values stored locally for the key, and the K closest nodes.
413         
414         @type key: C{string}
415         @param key: the target key to find the values and nodes for
416         @type id: C{string}
417         @param id: the node ID of the sender node
418         @type _krpc_sender: (C{string}, C{int})
419         @param _krpc_sender: the sender node's IP address and port
420         """
421         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
422         self.insertNode(n, contacted = False)
423     
424         nodes = self.table.findNodes(key)
425         nodes = map(lambda node: node.contactInfo(), nodes)
426         num_values = self.store.countValues(key)
427         return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
428
429     def krpc_get_value(self, key, num, id, _krpc_sender):
430         """Retrieve the values stored locally for the key.
431         
432         @type key: C{string}
433         @param key: the target key to retrieve the values for
434         @type num: C{int}
435         @param num: the maximum number of values to retrieve, or 0 to
436             retrieve all of them
437         @type id: C{string}
438         @param id: the node ID of the sender node
439         @type _krpc_sender: (C{string}, C{int})
440         @param _krpc_sender: the sender node's IP address and port
441         """
442         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
443         self.insertNode(n, contacted = False)
444     
445         l = self.store.retrieveValues(key)
446         if num == 0 or num >= len(l):
447             return {'values' : l, "id": self.node.id}
448         else:
449             shuffle(l)
450             return {'values' : l[:num], "id": self.node.id}
451
452
453 class KhashmirWrite(KhashmirRead):
454     """The read-write Khashmir class, which can store and retrieve key/value mappings."""
455
456     _Node = KNodeWrite
457
458     #{ Local interface
459     def storeValueForKey(self, key, value, callback=None):
460         """Stores the value for the key in the global table.
461         
462         No status in this implementation, peers respond but don't indicate
463         status of storing values.
464
465         @type key: C{string}
466         @param key: the target key to store the value for
467         @type value: C{string}
468         @param value: the value to store with the key
469         @type callback: C{method}
470         @param callback: the method to call with the results, it must take 3
471             parameters: the key, the value stored, and the result of the store
472             (optional, defaults to doing nothing with the results)
473         """
474         def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
475             """Use the returned K closest nodes to store the key at."""
476             if not response:
477                 def _storedValueHandler(key, value, sender):
478                     """Default callback that does nothing."""
479                     pass
480                 response = _storedValueHandler
481             action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
482             reactor.callLater(0, action.goWithNodes, nodes)
483             
484         # First find the K closest nodes to operate on.
485         self.findNode(key, _storeValueForKey)
486                     
487     #{ Remote interface
488     def krpc_store_value(self, key, value, token, id, _krpc_sender):
489         """Store the value locally with the key.
490         
491         @type key: C{string}
492         @param key: the target key to store the value for
493         @type value: C{string}
494         @param value: the value to store with the key
495         @param token: the token to confirm that this peer contacted us previously
496         @type id: C{string}
497         @param id: the node ID of the sender node
498         @type _krpc_sender: (C{string}, C{int})
499         @param _krpc_sender: the sender node's IP address and port
500         """
501         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
502         self.insertNode(n, contacted = False)
503         for secret in self.token_secrets:
504             this_token = sha(secret + _krpc_sender[0]).digest()
505             if token == this_token:
506                 self.store.storeValue(key, value)
507                 return {"id" : self.node.id}
508         raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
509
510
511 class Khashmir(KhashmirWrite):
512     """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
513     _Node = KNodeWrite
514
515
516 class SimpleTests(unittest.TestCase):
517     
518     timeout = 10
519     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
520                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
521                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
522                     'MAX_FAILURES': 3,
523                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
524                     'KEY_EXPIRE': 3600, 'SPEW': False, }
525
526     def setUp(self):
527         d = self.DHT_DEFAULTS.copy()
528         d['PORT'] = 4044
529         self.a = Khashmir(d)
530         d = self.DHT_DEFAULTS.copy()
531         d['PORT'] = 4045
532         self.b = Khashmir(d)
533         
534     def tearDown(self):
535         self.a.shutdown()
536         self.b.shutdown()
537         os.unlink(self.a.store.db)
538         os.unlink(self.b.store.db)
539
540     def testAddContact(self):
541         self.failUnlessEqual(len(self.a.table.buckets), 1)
542         self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
543
544         self.failUnlessEqual(len(self.b.table.buckets), 1)
545         self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
546
547         self.a.addContact('127.0.0.1', 4045)
548         reactor.iterate()
549         reactor.iterate()
550         reactor.iterate()
551         reactor.iterate()
552
553         self.failUnlessEqual(len(self.a.table.buckets), 1)
554         self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
555         self.failUnlessEqual(len(self.b.table.buckets), 1)
556         self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
557
558     def testStoreRetrieve(self):
559         self.a.addContact('127.0.0.1', 4045)
560         reactor.iterate()
561         reactor.iterate()
562         reactor.iterate()
563         reactor.iterate()
564         self.got = 0
565         self.a.storeValueForKey(sha('foo').digest(), 'foobar')
566         reactor.iterate()
567         reactor.iterate()
568         reactor.iterate()
569         reactor.iterate()
570         reactor.iterate()
571         reactor.iterate()
572         self.a.valueForKey(sha('foo').digest(), self._cb)
573         reactor.iterate()
574         reactor.iterate()
575         reactor.iterate()
576         reactor.iterate()
577         reactor.iterate()
578         reactor.iterate()
579         reactor.iterate()
580
581     def _cb(self, key, val):
582         if not val:
583             self.failUnlessEqual(self.got, 1)
584         elif 'foobar' in val:
585             self.got = 1
586
587
588 class MultiTest(unittest.TestCase):
589     
590     timeout = 30
591     num = 20
592     DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
593                     'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
594                     'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
595                     'MAX_FAILURES': 3,
596                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
597                     'KEY_EXPIRE': 3600, 'SPEW': False, }
598
599     def _done(self, val):
600         self.done = 1
601         
602     def setUp(self):
603         self.l = []
604         self.startport = 4088
605         for i in range(self.num):
606             d = self.DHT_DEFAULTS.copy()
607             d['PORT'] = self.startport + i
608             self.l.append(Khashmir(d))
609         reactor.iterate()
610         reactor.iterate()
611         
612         for i in self.l:
613             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
614             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
615             i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
616             reactor.iterate()
617             reactor.iterate()
618             reactor.iterate() 
619             
620         for i in self.l:
621             self.done = 0
622             i.findCloseNodes(self._done)
623             while not self.done:
624                 reactor.iterate()
625         for i in self.l:
626             self.done = 0
627             i.findCloseNodes(self._done)
628             while not self.done:
629                 reactor.iterate()
630
631     def tearDown(self):
632         for i in self.l:
633             i.shutdown()
634             os.unlink(i.store.db)
635             
636         reactor.iterate()
637         
638     def testStoreRetrieve(self):
639         for i in range(10):
640             K = newID()
641             V = newID()
642             
643             for a in range(3):
644                 self.done = 0
645                 def _scb(key, value, result):
646                     self.done = 1
647                 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
648                 while not self.done:
649                     reactor.iterate()
650
651
652                 def _rcb(key, val):
653                     if not val:
654                         self.done = 1
655                         self.failUnlessEqual(self.got, 1)
656                     elif V in val:
657                         self.got = 1
658                 for x in range(3):
659                     self.got = 0
660                     self.done = 0
661                     self.l[randrange(0, self.num)].valueForKey(K, _rcb)
662                     while not self.done:
663                         reactor.iterate()