1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """The main Khashmir program."""
7 warnings.simplefilter("ignore", DeprecationWarning)
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
15 from twisted.internet.defer import Deferred
16 from twisted.internet import protocol, reactor
17 from twisted.python import log
18 from twisted.trial import unittest
21 from ktable import KTable
22 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
23 from khash import newID, newIDInRange
24 from actions import FindNode, FindValue, GetValue, StoreValue
25 from stats import StatsLogger
28 class KhashmirBase(protocol.Factory):
29 """The base Khashmir class, with base functionality and find node, no key-value mappings.
31 @type _Node: L{node.Node}
32 @ivar _Node: the knode implementation to use for this class of DHT
33 @type config: C{dictionary}
34 @ivar config: the configuration parameters for the DHT
36 @ivar port: the port to listen on
38 @ivar store: the database to store nodes and key/value pairs in
39 @type node: L{node.Node}
41 @type table: L{ktable.KTable}
42 @ivar table: the routing table
43 @type token_secrets: C{list} of C{string}
44 @ivar token_secrets: the current secrets to use to create tokens
45 @type stats: L{stats.StatsLogger}
46 @ivar stats: the statistics gatherer
47 @type udp: L{krpc.hostbroker}
48 @ivar udp: the factory for the KRPC protocol
49 @type listenport: L{twisted.internet.interfaces.IListeningPort}
50 @ivar listenport: the UDP listening port
51 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
52 @ivar next_checkpoint: the delayed call for the next checkpoint
57 def __init__(self, config, cache_dir='/tmp'):
58 """Initialize the Khashmir class and call the L{setup} method.
60 @type config: C{dictionary}
61 @param config: the configuration parameters for the DHT
62 @type cache_dir: C{string}
63 @param cache_dir: the directory to store all files in
64 (optional, defaults to the /tmp directory)
67 self.setup(config, cache_dir)
69 def setup(self, config, cache_dir):
70 """Setup all the Khashmir sub-modules.
72 @type config: C{dictionary}
73 @param config: the configuration parameters for the DHT
74 @type cache_dir: C{string}
75 @param cache_dir: the directory to store all files in
78 self.port = config['PORT']
79 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
80 self.node = self._loadSelfNode('', self.port)
81 self.table = KTable(self.node, config)
82 self.token_secrets = [newID()]
83 self.stats = StatsLogger(self.table, self.store, self.config)
86 self.udp = krpc.hostbroker(self, self.stats, config)
87 self.udp.protocol = krpc.KRPC
88 self.listenport = reactor.listenUDP(self.port, self.udp)
90 # Load the routing table and begin checkpointing
91 self._loadRoutingTable()
92 self.refreshTable(force = True)
93 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
95 def Node(self, id, host = None, port = None):
98 @see: L{node.Node.__init__}
100 n = self._Node(id, host, port)
102 n.conn = self.udp.connectionForAddr((n.host, n.port))
106 """Stop listening for packets."""
107 self.listenport.stopListening()
109 def _loadSelfNode(self, host, port):
110 """Create this node, loading any previously saved one."""
111 id = self.store.getSelfNode()
114 return self._Node(id, host, port)
116 def checkpoint(self):
117 """Perform some periodic maintenance operations."""
118 # Create a new token secret
119 self.token_secrets.insert(0, newID())
120 if len(self.token_secrets) > 3:
121 self.token_secrets.pop()
123 # Save some parameters for reloading
124 self.store.saveSelfNode(self.node.id)
125 self.store.dumpRoutingTable(self.table.buckets)
128 self.store.expireValues(self.config['KEY_EXPIRE'])
131 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
132 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
135 def _loadRoutingTable(self):
136 """Load the previous routing table nodes from the database.
138 It's usually a good idea to call refreshTable(force = True) after
141 nodes = self.store.getRoutingTable()
143 n = self.Node(rec[0], rec[1], int(rec[2]))
144 self.table.insertNode(n, contacted = False)
147 def addContact(self, host, port, callback=None, errback=None):
148 """Ping this node and add the contact info to the table on pong.
150 @type host: C{string}
151 @param host: the IP address of the node to contact
153 @param port:the port of the node to contact
154 @type callback: C{method}
155 @param callback: the method to call with the results, it must take 1
156 parameter, the contact info returned by the node
157 (optional, defaults to doing nothing with the results)
158 @type errback: C{method}
159 @param errback: the method to call if an error occurs
160 (optional, defaults to calling the callback with None)
162 n = self.Node(NULL_ID, host, port)
163 self.sendJoin(n, callback=callback, errback=errback)
165 def findNode(self, id, callback, errback=None):
166 """Find the contact info for the K closest nodes in the global table.
169 @param id: the target ID to find the K closest nodes of
170 @type callback: C{method}
171 @param callback: the method to call with the results, it must take 1
172 parameter, the list of K closest nodes
173 @type errback: C{method}
174 @param errback: the method to call if an error occurs
175 (optional, defaults to doing nothing when an error occurs)
177 # Get K nodes out of local table/cache
178 nodes = self.table.findNodes(id)
179 nodes = [copy(node) for node in nodes]
182 d.addCallbacks(callback, errback)
184 d.addCallback(callback)
186 # If the target ID was found
187 if len(nodes) == 1 and nodes[0].id == id:
190 # Start the finding nodes action
191 state = FindNode(self, id, d.callback, self.config, self.stats)
192 reactor.callLater(0, state.goWithNodes, nodes)
194 def insertNode(self, node, contacted = True):
195 """Try to insert a node in our local table, pinging oldest contact if necessary.
197 If all you have is a host/port, then use L{addContact}, which calls this
198 method after receiving the PONG from the remote node. The reason for
199 the seperation is we can't insert a node into the table without its
200 node ID. That means of course the node passed into this method needs
201 to be a properly formed Node object with a valid ID.
203 @type node: L{node.Node}
204 @param node: the new node to try and insert
205 @type contacted: C{boolean}
206 @param contacted: whether the new node is known to be good, i.e.
207 responded to a request (optional, defaults to True)
209 old = self.table.insertNode(node, contacted=contacted)
210 if (old and old.id != self.node.id and
211 (datetime.now() - old.lastSeen) >
212 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
214 def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
215 """The pinged node never responded, so replace it."""
216 log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
218 self.table.replaceStaleNode(oldnode, newnode)
220 def _notStaleNodeHandler(dict, old=old, self=self):
221 """Got a pong from the old node, so update it."""
223 if dict['id'] == old.id:
224 self.table.justSeenNode(old.id)
226 # Bucket is full, check to see if old node is still available
227 self.stats.startedAction('ping')
228 df = old.ping(self.node.id)
229 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
231 def sendJoin(self, node, callback=None, errback=None):
232 """Join the DHT by pinging a bootstrap node.
234 @type node: L{node.Node}
235 @param node: the node to send the join to
236 @type callback: C{method}
237 @param callback: the method to call with the results, it must take 1
238 parameter, the contact info returned by the node
239 (optional, defaults to doing nothing with the results)
240 @type errback: C{method}
241 @param errback: the method to call if an error occurs
242 (optional, defaults to calling the callback with None)
245 def _pongHandler(dict, node=node, self=self, callback=callback):
246 """Node responded properly, callback with response."""
247 n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
250 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
252 def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
253 """Error occurred, fail node and errback or callback with error."""
254 log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
256 self.table.nodeFailed(node)
262 self.stats.startedAction('join')
263 df = node.join(self.node.id)
264 df.addCallbacks(_pongHandler, _defaultPong)
266 def findCloseNodes(self, callback=lambda a: None, errback = None):
267 """Perform a findNode on the ID one away from our own.
269 This will allow us to populate our table with nodes on our network
270 closest to our own. This is called as soon as we start up with an
273 @type callback: C{method}
274 @param callback: the method to call with the results, it must take 1
275 parameter, the list of K closest nodes
276 (optional, defaults to doing nothing with the results)
277 @type errback: C{method}
278 @param errback: the method to call if an error occurs
279 (optional, defaults to doing nothing when an error occurs)
281 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
282 self.findNode(id, callback, errback)
284 def refreshTable(self, force = False):
285 """Check all the buckets for those that need refreshing.
287 @param force: refresh all buckets regardless of last bucket access time
288 (optional, defaults to False)
293 for bucket in self.table.buckets:
294 if force or (datetime.now() - bucket.lastAccessed >
295 timedelta(seconds=self.config['BUCKET_STALENESS'])):
296 # Choose a random ID in the bucket and try and find it
297 id = newIDInRange(bucket.min, bucket.max)
298 self.findNode(id, callback)
301 """Closes the port and cancels pending later calls."""
302 self.listenport.stopListening()
304 self.next_checkpoint.cancel()
310 """Gather the statistics for the DHT."""
311 return self.stats.formatHTML()
314 def krpc_ping(self, id, _krpc_sender = None):
318 @param id: the node ID of the sender node
319 @type _krpc_sender: (C{string}, C{int})
320 @param _krpc_sender: the sender node's IP address and port
322 if _krpc_sender is not None:
323 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
324 self.insertNode(n, contacted = False)
326 return {"id" : self.node.id}
328 def krpc_join(self, id, _krpc_sender = None):
329 """Add the node by responding with its address and port.
332 @param id: the node ID of the sender node
333 @type _krpc_sender: (C{string}, C{int})
334 @param _krpc_sender: the sender node's IP address and port
336 if _krpc_sender is not None:
337 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
338 self.insertNode(n, contacted = False)
340 _krpc_sender = ('127.0.0.1', self.port)
342 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
344 def krpc_find_node(self, id, target, _krpc_sender = None):
345 """Find the K closest nodes to the target in the local routing table.
347 @type target: C{string}
348 @param target: the target ID to find nodes for
350 @param id: the node ID of the sender node
351 @type _krpc_sender: (C{string}, C{int})
352 @param _krpc_sender: the sender node's IP address and port
354 if _krpc_sender is not None:
355 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
356 self.insertNode(n, contacted = False)
358 _krpc_sender = ('127.0.0.1', self.port)
360 nodes = self.table.findNodes(target)
361 nodes = map(lambda node: node.contactInfo(), nodes)
362 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
363 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
366 class KhashmirRead(KhashmirBase):
367 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
372 def findValue(self, key, callback, errback=None):
373 """Get the nodes that have values for the key from the global table.
376 @param key: the target key to find the values for
377 @type callback: C{method}
378 @param callback: the method to call with the results, it must take 1
379 parameter, the list of nodes with values
380 @type errback: C{method}
381 @param errback: the method to call if an error occurs
382 (optional, defaults to doing nothing when an error occurs)
384 # Get K nodes out of local table/cache
385 nodes = self.table.findNodes(key)
386 nodes = [copy(node) for node in nodes]
389 d.addCallbacks(callback, errback)
391 d.addCallback(callback)
393 # Search for others starting with the locally found ones
394 state = FindValue(self, key, d.callback, self.config, self.stats)
395 reactor.callLater(0, state.goWithNodes, nodes)
397 def valueForKey(self, key, callback, searchlocal = True):
398 """Get the values found for key in global table.
400 Callback will be called with a list of values for each peer that
401 returns unique values. The final callback will be an empty list.
404 @param key: the target key to get the values for
405 @type callback: C{method}
406 @param callback: the method to call with the results, it must take 2
407 parameters: the key, and the values found
408 @type searchlocal: C{boolean}
409 @param searchlocal: whether to also look for any local values
411 # Get any local values
413 l = self.store.retrieveValues(key)
415 reactor.callLater(0, callback, key, l)
419 def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
420 """Use the found nodes to send requests for values to."""
421 state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
422 reactor.callLater(0, state.goWithNodes, nodes)
424 # First lookup nodes that have values for the key
425 self.findValue(key, _getValueForKey)
428 def krpc_find_value(self, id, key, _krpc_sender = None):
429 """Find the number of values stored locally for the key, and the K closest nodes.
432 @param key: the target key to find the values and nodes for
434 @param id: the node ID of the sender node
435 @type _krpc_sender: (C{string}, C{int})
436 @param _krpc_sender: the sender node's IP address and port
438 if _krpc_sender is not None:
439 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
440 self.insertNode(n, contacted = False)
442 nodes = self.table.findNodes(key)
443 nodes = map(lambda node: node.contactInfo(), nodes)
444 num_values = self.store.countValues(key)
445 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
447 def krpc_get_value(self, id, key, num, _krpc_sender = None):
448 """Retrieve the values stored locally for the key.
451 @param key: the target key to retrieve the values for
453 @param num: the maximum number of values to retrieve, or 0 to
456 @param id: the node ID of the sender node
457 @type _krpc_sender: (C{string}, C{int})
458 @param _krpc_sender: the sender node's IP address and port
460 if _krpc_sender is not None:
461 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
462 self.insertNode(n, contacted = False)
464 l = self.store.retrieveValues(key)
465 if num == 0 or num >= len(l):
466 return {'values' : l, "id": self.node.id}
469 return {'values' : l[:num], "id": self.node.id}
472 class KhashmirWrite(KhashmirRead):
473 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
478 def storeValueForKey(self, key, value, callback=None):
479 """Stores the value for the key in the global table.
481 No status in this implementation, peers respond but don't indicate
482 status of storing values.
485 @param key: the target key to store the value for
486 @type value: C{string}
487 @param value: the value to store with the key
488 @type callback: C{method}
489 @param callback: the method to call with the results, it must take 3
490 parameters: the key, the value stored, and the result of the store
491 (optional, defaults to doing nothing with the results)
493 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
494 """Use the returned K closest nodes to store the key at."""
496 def _storedValueHandler(key, value, sender):
497 """Default callback that does nothing."""
499 response = _storedValueHandler
500 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
501 reactor.callLater(0, action.goWithNodes, nodes)
503 # First find the K closest nodes to operate on.
504 self.findNode(key, _storeValueForKey)
507 def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
508 """Store the value locally with the key.
511 @param key: the target key to store the value for
512 @type value: C{string}
513 @param value: the value to store with the key
514 @param token: the token to confirm that this peer contacted us previously
516 @param id: the node ID of the sender node
517 @type _krpc_sender: (C{string}, C{int})
518 @param _krpc_sender: the sender node's IP address and port
520 if _krpc_sender is not None:
521 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
522 self.insertNode(n, contacted = False)
524 _krpc_sender = ('127.0.0.1', self.port)
526 for secret in self.token_secrets:
527 this_token = sha(secret + _krpc_sender[0]).digest()
528 if token == this_token:
529 self.store.storeValue(key, value)
530 return {"id" : self.node.id}
531 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
534 class Khashmir(KhashmirWrite):
535 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
539 class SimpleTests(unittest.TestCase):
542 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
543 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
544 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
546 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
547 'KEY_EXPIRE': 3600, 'SPEW': False, }
550 d = self.DHT_DEFAULTS.copy()
553 d = self.DHT_DEFAULTS.copy()
560 os.unlink(self.a.store.db)
561 os.unlink(self.b.store.db)
563 def testAddContact(self):
564 self.failUnlessEqual(len(self.a.table.buckets), 1)
565 self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
567 self.failUnlessEqual(len(self.b.table.buckets), 1)
568 self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
570 self.a.addContact('127.0.0.1', 4045)
576 self.failUnlessEqual(len(self.a.table.buckets), 1)
577 self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
578 self.failUnlessEqual(len(self.b.table.buckets), 1)
579 self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
581 def testStoreRetrieve(self):
582 self.a.addContact('127.0.0.1', 4045)
588 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
595 self.a.valueForKey(sha('foo').digest(), self._cb)
604 def _cb(self, key, val):
606 self.failUnlessEqual(self.got, 1)
607 elif 'foobar' in val:
611 class MultiTest(unittest.TestCase):
615 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
616 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
617 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
619 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
620 'KEY_EXPIRE': 3600, 'SPEW': False, }
622 def _done(self, val):
627 self.startport = 4088
628 for i in range(self.num):
629 d = self.DHT_DEFAULTS.copy()
630 d['PORT'] = self.startport + i
631 self.l.append(Khashmir(d))
636 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
637 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
638 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
645 i.findCloseNodes(self._done)
650 i.findCloseNodes(self._done)
657 os.unlink(i.store.db)
661 def testStoreRetrieve(self):
668 def _scb(key, value, result):
670 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
678 self.failUnlessEqual(self.got, 1)
684 self.l[randrange(0, self.num)].valueForKey(K, _rcb)