1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """The main Khashmir program."""
7 warnings.simplefilter("ignore", DeprecationWarning)
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
14 from twisted.internet.defer import Deferred
15 from twisted.internet import protocol, reactor
16 from twisted.python import log
17 from twisted.trial import unittest
20 from ktable import KTable
21 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
22 from khash import newID, newIDInRange
23 from actions import FindNode, FindValue, GetValue, StoreValue
24 from stats import StatsLogger
27 class KhashmirBase(protocol.Factory):
28 """The base Khashmir class, with base functionality and find node, no key-value mappings.
30 @type _Node: L{node.Node}
31 @ivar _Node: the knode implementation to use for this class of DHT
32 @type config: C{dictionary}
33 @ivar config: the configuration parameters for the DHT
35 @ivar port: the port to listen on
37 @ivar store: the database to store nodes and key/value pairs in
38 @type node: L{node.Node}
40 @type table: L{ktable.KTable}
41 @ivar table: the routing table
42 @type token_secrets: C{list} of C{string}
43 @ivar token_secrets: the current secrets to use to create tokens
44 @type stats: L{stats.StatsLogger}
45 @ivar stats: the statistics gatherer
46 @type udp: L{krpc.hostbroker}
47 @ivar udp: the factory for the KRPC protocol
48 @type listenport: L{twisted.internet.interfaces.IListeningPort}
49 @ivar listenport: the UDP listening port
50 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
51 @ivar next_checkpoint: the delayed call for the next checkpoint
56 def __init__(self, config, cache_dir='/tmp'):
57 """Initialize the Khashmir class and call the L{setup} method.
59 @type config: C{dictionary}
60 @param config: the configuration parameters for the DHT
61 @type cache_dir: C{string}
62 @param cache_dir: the directory to store all files in
63 (optional, defaults to the /tmp directory)
66 self.setup(config, cache_dir)
68 def setup(self, config, cache_dir):
69 """Setup all the Khashmir sub-modules.
71 @type config: C{dictionary}
72 @param config: the configuration parameters for the DHT
73 @type cache_dir: C{string}
74 @param cache_dir: the directory to store all files in
77 self.port = config['PORT']
78 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
79 self.node = self._loadSelfNode('', self.port)
80 self.table = KTable(self.node, config)
81 self.token_secrets = [newID()]
82 self.stats = StatsLogger(self.table, self.store, self.config)
85 self.udp = krpc.hostbroker(self, self.stats, config)
86 self.udp.protocol = krpc.KRPC
87 self.listenport = reactor.listenUDP(self.port, self.udp)
89 # Load the routing table and begin checkpointing
90 self._loadRoutingTable()
91 self.refreshTable(force = True)
92 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
94 def Node(self, id, host = None, port = None):
97 @see: L{node.Node.__init__}
99 n = self._Node(id, host, port)
101 n.conn = self.udp.connectionForAddr((n.host, n.port))
105 """Stop listening for packets."""
106 self.listenport.stopListening()
108 def _loadSelfNode(self, host, port):
109 """Create this node, loading any previously saved one."""
110 id = self.store.getSelfNode()
113 return self._Node(id, host, port)
115 def checkpoint(self):
116 """Perform some periodic maintenance operations."""
117 # Create a new token secret
118 self.token_secrets.insert(0, newID())
119 if len(self.token_secrets) > 3:
120 self.token_secrets.pop()
122 # Save some parameters for reloading
123 self.store.saveSelfNode(self.node.id)
124 self.store.dumpRoutingTable(self.table.buckets)
127 self.store.expireValues(self.config['KEY_EXPIRE'])
130 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
131 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
134 def _loadRoutingTable(self):
135 """Load the previous routing table nodes from the database.
137 It's usually a good idea to call refreshTable(force = True) after
140 nodes = self.store.getRoutingTable()
142 n = self.Node(rec[0], rec[1], int(rec[2]))
143 self.table.insertNode(n, contacted = False)
146 def addContact(self, host, port, callback=None, errback=None):
147 """Ping this node and add the contact info to the table on pong.
149 @type host: C{string}
150 @param host: the IP address of the node to contact
152 @param port:the port of the node to contact
153 @type callback: C{method}
154 @param callback: the method to call with the results, it must take 1
155 parameter, the contact info returned by the node
156 (optional, defaults to doing nothing with the results)
157 @type errback: C{method}
158 @param errback: the method to call if an error occurs
159 (optional, defaults to calling the callback with None)
161 n = self.Node(NULL_ID, host, port)
162 self.sendJoin(n, callback=callback, errback=errback)
164 def findNode(self, id, callback, errback=None):
165 """Find the contact info for the K closest nodes in the global table.
168 @param id: the target ID to find the K closest nodes of
169 @type callback: C{method}
170 @param callback: the method to call with the results, it must take 1
171 parameter, the list of K closest nodes
172 @type errback: C{method}
173 @param errback: the method to call if an error occurs
174 (optional, defaults to doing nothing when an error occurs)
176 # Get K nodes out of local table/cache
177 nodes = self.table.findNodes(id)
180 d.addCallbacks(callback, errback)
182 d.addCallback(callback)
184 # If the target ID was found
185 if len(nodes) == 1 and nodes[0].id == id:
188 # Start the finding nodes action
189 state = FindNode(self, id, d.callback, self.config, self.stats)
190 reactor.callLater(0, state.goWithNodes, nodes)
192 def insertNode(self, node, contacted = True):
193 """Try to insert a node in our local table, pinging oldest contact if necessary.
195 If all you have is a host/port, then use L{addContact}, which calls this
196 method after receiving the PONG from the remote node. The reason for
197 the seperation is we can't insert a node into the table without its
198 node ID. That means of course the node passed into this method needs
199 to be a properly formed Node object with a valid ID.
201 @type node: L{node.Node}
202 @param node: the new node to try and insert
203 @type contacted: C{boolean}
204 @param contacted: whether the new node is known to be good, i.e.
205 responded to a request (optional, defaults to True)
207 old = self.table.insertNode(node, contacted=contacted)
208 if (old and old.id != self.node.id and
209 (datetime.now() - old.lastSeen) >
210 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
212 def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
213 """The pinged node never responded, so replace it."""
214 log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
216 self.table.replaceStaleNode(oldnode, newnode)
218 def _notStaleNodeHandler(dict, old=old, self=self):
219 """Got a pong from the old node, so update it."""
221 if dict['id'] == old.id:
222 self.table.justSeenNode(old.id)
224 # Bucket is full, check to see if old node is still available
225 self.stats.startedAction('ping')
226 df = old.ping(self.node.id)
227 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
229 def sendJoin(self, node, callback=None, errback=None):
230 """Join the DHT by pinging a bootstrap node.
232 @type node: L{node.Node}
233 @param node: the node to send the join to
234 @type callback: C{method}
235 @param callback: the method to call with the results, it must take 1
236 parameter, the contact info returned by the node
237 (optional, defaults to doing nothing with the results)
238 @type errback: C{method}
239 @param errback: the method to call if an error occurs
240 (optional, defaults to calling the callback with None)
243 def _pongHandler(dict, node=node, self=self, callback=callback):
244 """Node responded properly, callback with response."""
245 n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
248 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
250 def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
251 """Error occurred, fail node and errback or callback with error."""
252 log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
254 self.table.nodeFailed(node)
260 self.stats.startedAction('join')
261 df = node.join(self.node.id)
262 df.addCallbacks(_pongHandler, _defaultPong)
264 def findCloseNodes(self, callback=lambda a: None, errback = None):
265 """Perform a findNode on the ID one away from our own.
267 This will allow us to populate our table with nodes on our network
268 closest to our own. This is called as soon as we start up with an
271 @type callback: C{method}
272 @param callback: the method to call with the results, it must take 1
273 parameter, the list of K closest nodes
274 (optional, defaults to doing nothing with the results)
275 @type errback: C{method}
276 @param errback: the method to call if an error occurs
277 (optional, defaults to doing nothing when an error occurs)
279 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
280 self.findNode(id, callback, errback)
282 def refreshTable(self, force = False):
283 """Check all the buckets for those that need refreshing.
285 @param force: refresh all buckets regardless of last bucket access time
286 (optional, defaults to False)
291 for bucket in self.table.buckets:
292 if force or (datetime.now() - bucket.lastAccessed >
293 timedelta(seconds=self.config['BUCKET_STALENESS'])):
294 # Choose a random ID in the bucket and try and find it
295 id = newIDInRange(bucket.min, bucket.max)
296 self.findNode(id, callback)
299 """Closes the port and cancels pending later calls."""
300 self.listenport.stopListening()
302 self.next_checkpoint.cancel()
308 """Gather the statistics for the DHT."""
309 return self.stats.formatHTML()
312 def krpc_ping(self, id, _krpc_sender):
316 @param id: the node ID of the sender node
317 @type _krpc_sender: (C{string}, C{int})
318 @param _krpc_sender: the sender node's IP address and port
320 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
321 self.insertNode(n, contacted = False)
323 return {"id" : self.node.id}
325 def krpc_join(self, id, _krpc_sender):
326 """Add the node by responding with its address and port.
329 @param id: the node ID of the sender node
330 @type _krpc_sender: (C{string}, C{int})
331 @param _krpc_sender: the sender node's IP address and port
333 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
334 self.insertNode(n, contacted = False)
336 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
338 def krpc_find_node(self, target, id, _krpc_sender):
339 """Find the K closest nodes to the target in the local routing table.
341 @type target: C{string}
342 @param target: the target ID to find nodes for
344 @param id: the node ID of the sender node
345 @type _krpc_sender: (C{string}, C{int})
346 @param _krpc_sender: the sender node's IP address and port
348 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
349 self.insertNode(n, contacted = False)
351 nodes = self.table.findNodes(target)
352 nodes = map(lambda node: node.contactInfo(), nodes)
353 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
354 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
357 class KhashmirRead(KhashmirBase):
358 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
363 def findValue(self, key, callback, errback=None):
364 """Get the nodes that have values for the key from the global table.
367 @param key: the target key to find the values for
368 @type callback: C{method}
369 @param callback: the method to call with the results, it must take 1
370 parameter, the list of nodes with values
371 @type errback: C{method}
372 @param errback: the method to call if an error occurs
373 (optional, defaults to doing nothing when an error occurs)
375 # Get K nodes out of local table/cache
376 nodes = self.table.findNodes(key)
379 d.addCallbacks(callback, errback)
381 d.addCallback(callback)
383 # Search for others starting with the locally found ones
384 state = FindValue(self, key, d.callback, self.config, self.stats)
385 reactor.callLater(0, state.goWithNodes, nodes)
387 def valueForKey(self, key, callback, searchlocal = True):
388 """Get the values found for key in global table.
390 Callback will be called with a list of values for each peer that
391 returns unique values. The final callback will be an empty list.
394 @param key: the target key to get the values for
395 @type callback: C{method}
396 @param callback: the method to call with the results, it must take 2
397 parameters: the key, and the values found
398 @type searchlocal: C{boolean}
399 @param searchlocal: whether to also look for any local values
401 # Get any local values
403 l = self.store.retrieveValues(key)
405 reactor.callLater(0, callback, key, l)
409 def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
410 """Use the found nodes to send requests for values to."""
411 state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
412 reactor.callLater(0, state.goWithNodes, nodes)
414 # First lookup nodes that have values for the key
415 self.findValue(key, _getValueForKey)
418 def krpc_find_value(self, key, id, _krpc_sender):
419 """Find the number of values stored locally for the key, and the K closest nodes.
422 @param key: the target key to find the values and nodes for
424 @param id: the node ID of the sender node
425 @type _krpc_sender: (C{string}, C{int})
426 @param _krpc_sender: the sender node's IP address and port
428 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
429 self.insertNode(n, contacted = False)
431 nodes = self.table.findNodes(key)
432 nodes = map(lambda node: node.contactInfo(), nodes)
433 num_values = self.store.countValues(key)
434 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
436 def krpc_get_value(self, key, num, id, _krpc_sender):
437 """Retrieve the values stored locally for the key.
440 @param key: the target key to retrieve the values for
442 @param num: the maximum number of values to retrieve, or 0 to
445 @param id: the node ID of the sender node
446 @type _krpc_sender: (C{string}, C{int})
447 @param _krpc_sender: the sender node's IP address and port
449 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
450 self.insertNode(n, contacted = False)
452 l = self.store.retrieveValues(key)
453 if num == 0 or num >= len(l):
454 return {'values' : l, "id": self.node.id}
457 return {'values' : l[:num], "id": self.node.id}
460 class KhashmirWrite(KhashmirRead):
461 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
466 def storeValueForKey(self, key, value, callback=None):
467 """Stores the value for the key in the global table.
469 No status in this implementation, peers respond but don't indicate
470 status of storing values.
473 @param key: the target key to store the value for
474 @type value: C{string}
475 @param value: the value to store with the key
476 @type callback: C{method}
477 @param callback: the method to call with the results, it must take 3
478 parameters: the key, the value stored, and the result of the store
479 (optional, defaults to doing nothing with the results)
481 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
482 """Use the returned K closest nodes to store the key at."""
484 def _storedValueHandler(key, value, sender):
485 """Default callback that does nothing."""
487 response = _storedValueHandler
488 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
489 reactor.callLater(0, action.goWithNodes, nodes)
491 # First find the K closest nodes to operate on.
492 self.findNode(key, _storeValueForKey)
495 def krpc_store_value(self, key, value, token, id, _krpc_sender):
496 """Store the value locally with the key.
499 @param key: the target key to store the value for
500 @type value: C{string}
501 @param value: the value to store with the key
502 @param token: the token to confirm that this peer contacted us previously
504 @param id: the node ID of the sender node
505 @type _krpc_sender: (C{string}, C{int})
506 @param _krpc_sender: the sender node's IP address and port
508 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
509 self.insertNode(n, contacted = False)
510 for secret in self.token_secrets:
511 this_token = sha(secret + _krpc_sender[0]).digest()
512 if token == this_token:
513 self.store.storeValue(key, value)
514 return {"id" : self.node.id}
515 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
518 class Khashmir(KhashmirWrite):
519 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
523 class SimpleTests(unittest.TestCase):
526 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
527 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
528 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
530 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
531 'KEY_EXPIRE': 3600, 'SPEW': False, }
534 d = self.DHT_DEFAULTS.copy()
537 d = self.DHT_DEFAULTS.copy()
544 os.unlink(self.a.store.db)
545 os.unlink(self.b.store.db)
547 def testAddContact(self):
548 self.failUnlessEqual(len(self.a.table.buckets), 1)
549 self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
551 self.failUnlessEqual(len(self.b.table.buckets), 1)
552 self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
554 self.a.addContact('127.0.0.1', 4045)
560 self.failUnlessEqual(len(self.a.table.buckets), 1)
561 self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
562 self.failUnlessEqual(len(self.b.table.buckets), 1)
563 self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
565 def testStoreRetrieve(self):
566 self.a.addContact('127.0.0.1', 4045)
572 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
579 self.a.valueForKey(sha('foo').digest(), self._cb)
588 def _cb(self, key, val):
590 self.failUnlessEqual(self.got, 1)
591 elif 'foobar' in val:
595 class MultiTest(unittest.TestCase):
599 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
600 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
601 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
603 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
604 'KEY_EXPIRE': 3600, 'SPEW': False, }
606 def _done(self, val):
611 self.startport = 4088
612 for i in range(self.num):
613 d = self.DHT_DEFAULTS.copy()
614 d['PORT'] = self.startport + i
615 self.l.append(Khashmir(d))
620 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
621 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
622 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
629 i.findCloseNodes(self._done)
634 i.findCloseNodes(self._done)
641 os.unlink(i.store.db)
645 def testStoreRetrieve(self):
652 def _scb(key, value, result):
654 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
662 self.failUnlessEqual(self.got, 1)
668 self.l[randrange(0, self.num)].valueForKey(K, _rcb)