1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """The main Khashmir program."""
7 warnings.simplefilter("ignore", DeprecationWarning)
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
14 from twisted.internet.defer import Deferred
15 from twisted.internet import protocol, reactor
16 from twisted.trial import unittest
19 from ktable import KTable
20 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
21 from khash import newID, newIDInRange
22 from actions import FindNode, FindValue, GetValue, StoreValue
25 class KhashmirBase(protocol.Factory):
26 """The base Khashmir class, with base functionality and find node, no key-value mappings.
28 @type _Node: L{node.Node}
29 @ivar _Node: the knode implementation to use for this class of DHT
30 @type config: C{dictionary}
31 @ivar config: the configuration parameters for the DHT
33 @ivar port: the port to listen on
35 @ivar store: the database to store nodes and key/value pairs in
36 @type node: L{node.Node}
38 @type table: L{ktable.KTable}
39 @ivar table: the routing table
40 @type token_secrets: C{list} of C{string}
41 @ivar token_secrets: the current secrets to use to create tokens
42 @type udp: L{krpc.hostbroker}
43 @ivar udp: the factory for the KRPC protocol
44 @type listenport: L{twisted.internet.interfaces.IListeningPort}
45 @ivar listenport: the UDP listening port
46 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
47 @ivar next_checkpoint: the delayed call for the next checkpoint
52 def __init__(self, config, cache_dir='/tmp'):
53 """Initialize the Khashmir class and call the L{setup} method.
55 @type config: C{dictionary}
56 @param config: the configuration parameters for the DHT
57 @type cache_dir: C{string}
58 @param cache_dir: the directory to store all files in
59 (optional, defaults to the /tmp directory)
62 self.setup(config, cache_dir)
64 def setup(self, config, cache_dir):
65 """Setup all the Khashmir sub-modules.
67 @type config: C{dictionary}
68 @param config: the configuration parameters for the DHT
69 @type cache_dir: C{string}
70 @param cache_dir: the directory to store all files in
73 self.port = config['PORT']
74 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
75 self.node = self._loadSelfNode('', self.port)
76 self.table = KTable(self.node, config)
77 self.token_secrets = [newID()]
80 self.udp = krpc.hostbroker(self, config)
81 self.udp.protocol = krpc.KRPC
82 self.listenport = reactor.listenUDP(self.port, self.udp)
84 # Load the routing table and begin checkpointing
85 self._loadRoutingTable()
86 self.refreshTable(force = True)
87 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
89 def Node(self, id, host = None, port = None):
92 @see: L{node.Node.__init__}
94 n = self._Node(id, host, port)
96 n.conn = self.udp.connectionForAddr((n.host, n.port))
100 """Stop listening for packets."""
101 self.listenport.stopListening()
103 def _loadSelfNode(self, host, port):
104 """Create this node, loading any previously saved one."""
105 id = self.store.getSelfNode()
108 return self._Node(id, host, port)
110 def checkpoint(self):
111 """Perform some periodic maintenance operations."""
112 # Create a new token secret
113 self.token_secrets.insert(0, newID())
114 if len(self.token_secrets) > 3:
115 self.token_secrets.pop()
117 # Save some parameters for reloading
118 self.store.saveSelfNode(self.node.id)
119 self.store.dumpRoutingTable(self.table.buckets)
122 self.store.expireValues(self.config['KEY_EXPIRE'])
125 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
126 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
129 def _loadRoutingTable(self):
130 """Load the previous routing table nodes from the database.
132 It's usually a good idea to call refreshTable(force = True) after
135 nodes = self.store.getRoutingTable()
137 n = self.Node(rec[0], rec[1], int(rec[2]))
138 self.table.insertNode(n, contacted = False)
141 def addContact(self, host, port, callback=None, errback=None):
142 """Ping this node and add the contact info to the table on pong.
144 @type host: C{string}
145 @param host: the IP address of the node to contact
147 @param port:the port of the node to contact
148 @type callback: C{method}
149 @param callback: the method to call with the results, it must take 1
150 parameter, the contact info returned by the node
151 (optional, defaults to doing nothing with the results)
152 @type errback: C{method}
153 @param errback: the method to call if an error occurs
154 (optional, defaults to calling the callback with None)
156 n = self.Node(NULL_ID, host, port)
157 self.sendJoin(n, callback=callback, errback=errback)
159 def findNode(self, id, callback, errback=None):
160 """Find the contact info for the K closest nodes in the global table.
163 @param id: the target ID to find the K closest nodes of
164 @type callback: C{method}
165 @param callback: the method to call with the results, it must take 1
166 parameter, the list of K closest nodes
167 @type errback: C{method}
168 @param errback: the method to call if an error occurs
169 (optional, defaults to doing nothing when an error occurs)
171 # Get K nodes out of local table/cache
172 nodes = self.table.findNodes(id)
175 d.addCallbacks(callback, errback)
177 d.addCallback(callback)
179 # If the target ID was found
180 if len(nodes) == 1 and nodes[0].id == id:
183 # Start the finding nodes action
184 state = FindNode(self, id, d.callback, self.config)
185 reactor.callLater(0, state.goWithNodes, nodes)
187 def insertNode(self, node, contacted = True):
188 """Try to insert a node in our local table, pinging oldest contact if necessary.
190 If all you have is a host/port, then use L{addContact}, which calls this
191 method after receiving the PONG from the remote node. The reason for
192 the seperation is we can't insert a node into the table without its
193 node ID. That means of course the node passed into this method needs
194 to be a properly formed Node object with a valid ID.
196 @type node: L{node.Node}
197 @param node: the new node to try and insert
198 @type contacted: C{boolean}
199 @param contacted: whether the new node is known to be good, i.e.
200 responded to a request (optional, defaults to True)
202 old = self.table.insertNode(node, contacted=contacted)
203 if (old and old.id != self.node.id and
204 (datetime.now() - old.lastSeen) >
205 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
207 def _staleNodeHandler(oldnode = old, newnode = node):
208 """The pinged node never responded, so replace it."""
209 self.table.replaceStaleNode(oldnode, newnode)
211 def _notStaleNodeHandler(dict, old=old):
212 """Got a pong from the old node, so update it."""
214 if dict['id'] == old.id:
215 self.table.justSeenNode(old.id)
217 # Bucket is full, check to see if old node is still available
218 df = old.ping(self.node.id)
219 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
221 def sendJoin(self, node, callback=None, errback=None):
222 """Join the DHT by pinging a bootstrap node.
224 @type node: L{node.Node}
225 @param node: the node to send the join to
226 @type callback: C{method}
227 @param callback: the method to call with the results, it must take 1
228 parameter, the contact info returned by the node
229 (optional, defaults to doing nothing with the results)
230 @type errback: C{method}
231 @param errback: the method to call if an error occurs
232 (optional, defaults to calling the callback with None)
235 def _pongHandler(dict, node=node, self=self, callback=callback):
236 """Node responded properly, callback with response."""
237 n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
240 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
242 def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
243 """Error occurred, fail node and errback or callback with error."""
244 table.nodeFailed(node)
250 df = node.join(self.node.id)
251 df.addCallbacks(_pongHandler, _defaultPong)
253 def findCloseNodes(self, callback=lambda a: None, errback = None):
254 """Perform a findNode on the ID one away from our own.
256 This will allow us to populate our table with nodes on our network
257 closest to our own. This is called as soon as we start up with an
260 @type callback: C{method}
261 @param callback: the method to call with the results, it must take 1
262 parameter, the list of K closest nodes
263 (optional, defaults to doing nothing with the results)
264 @type errback: C{method}
265 @param errback: the method to call if an error occurs
266 (optional, defaults to doing nothing when an error occurs)
268 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
269 self.findNode(id, callback, errback)
271 def refreshTable(self, force = False):
272 """Check all the buckets for those that need refreshing.
274 @param force: refresh all buckets regardless of last bucket access time
275 (optional, defaults to False)
280 for bucket in self.table.buckets:
281 if force or (datetime.now() - bucket.lastAccessed >
282 timedelta(seconds=self.config['BUCKET_STALENESS'])):
283 # Choose a random ID in the bucket and try and find it
284 id = newIDInRange(bucket.min, bucket.max)
285 self.findNode(id, callback)
288 """Collect some statistics about the DHT.
290 @rtype: (C{int}, C{int})
291 @return: the number contacts in our routing table, and the estimated
292 number of nodes in the entire DHT
294 num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
295 num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
296 return (num_contacts, num_nodes)
299 """Closes the port and cancels pending later calls."""
300 self.listenport.stopListening()
302 self.next_checkpoint.cancel()
308 def krpc_ping(self, id, _krpc_sender):
312 @param id: the node ID of the sender node
313 @type _krpc_sender: (C{string}, C{int})
314 @param _krpc_sender: the sender node's IP address and port
316 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
317 self.insertNode(n, contacted = False)
319 return {"id" : self.node.id}
321 def krpc_join(self, id, _krpc_sender):
322 """Add the node by responding with its address and port.
325 @param id: the node ID of the sender node
326 @type _krpc_sender: (C{string}, C{int})
327 @param _krpc_sender: the sender node's IP address and port
329 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
330 self.insertNode(n, contacted = False)
332 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
334 def krpc_find_node(self, target, id, _krpc_sender):
335 """Find the K closest nodes to the target in the local routing table.
337 @type target: C{string}
338 @param target: the target ID to find nodes for
340 @param id: the node ID of the sender node
341 @type _krpc_sender: (C{string}, C{int})
342 @param _krpc_sender: the sender node's IP address and port
344 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
345 self.insertNode(n, contacted = False)
347 nodes = self.table.findNodes(target)
348 nodes = map(lambda node: node.contactInfo(), nodes)
349 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
350 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
353 class KhashmirRead(KhashmirBase):
354 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
359 def findValue(self, key, callback, errback=None):
360 """Get the nodes that have values for the key from the global table.
363 @param key: the target key to find the values for
364 @type callback: C{method}
365 @param callback: the method to call with the results, it must take 1
366 parameter, the list of nodes with values
367 @type errback: C{method}
368 @param errback: the method to call if an error occurs
369 (optional, defaults to doing nothing when an error occurs)
371 # Get K nodes out of local table/cache
372 nodes = self.table.findNodes(key)
375 d.addCallbacks(callback, errback)
377 d.addCallback(callback)
379 # Search for others starting with the locally found ones
380 state = FindValue(self, key, d.callback, self.config)
381 reactor.callLater(0, state.goWithNodes, nodes)
383 def valueForKey(self, key, callback, searchlocal = True):
384 """Get the values found for key in global table.
386 Callback will be called with a list of values for each peer that
387 returns unique values. The final callback will be an empty list.
390 @param key: the target key to get the values for
391 @type callback: C{method}
392 @param callback: the method to call with the results, it must take 2
393 parameters: the key, and the values found
394 @type searchlocal: C{boolean}
395 @param searchlocal: whether to also look for any local values
397 # Get any local values
399 l = self.store.retrieveValues(key)
401 reactor.callLater(0, callback, key, l)
405 def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
406 """Use the found nodes to send requests for values to."""
407 state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
408 reactor.callLater(0, state.goWithNodes, nodes)
410 # First lookup nodes that have values for the key
411 self.findValue(key, _getValueForKey)
414 def krpc_find_value(self, key, id, _krpc_sender):
415 """Find the number of values stored locally for the key, and the K closest nodes.
418 @param key: the target key to find the values and nodes for
420 @param id: the node ID of the sender node
421 @type _krpc_sender: (C{string}, C{int})
422 @param _krpc_sender: the sender node's IP address and port
424 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
425 self.insertNode(n, contacted = False)
427 nodes = self.table.findNodes(key)
428 nodes = map(lambda node: node.contactInfo(), nodes)
429 num_values = self.store.countValues(key)
430 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
432 def krpc_get_value(self, key, num, id, _krpc_sender):
433 """Retrieve the values stored locally for the key.
436 @param key: the target key to retrieve the values for
438 @param num: the maximum number of values to retrieve, or 0 to
441 @param id: the node ID of the sender node
442 @type _krpc_sender: (C{string}, C{int})
443 @param _krpc_sender: the sender node's IP address and port
445 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
446 self.insertNode(n, contacted = False)
448 l = self.store.retrieveValues(key)
449 if num == 0 or num >= len(l):
450 return {'values' : l, "id": self.node.id}
453 return {'values' : l[:num], "id": self.node.id}
456 class KhashmirWrite(KhashmirRead):
457 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
462 def storeValueForKey(self, key, value, callback=None):
463 """Stores the value for the key in the global table.
465 No status in this implementation, peers respond but don't indicate
466 status of storing values.
469 @param key: the target key to store the value for
470 @type value: C{string}
471 @param value: the value to store with the key
472 @type callback: C{method}
473 @param callback: the method to call with the results, it must take 3
474 parameters: the key, the value stored, and the result of the store
475 (optional, defaults to doing nothing with the results)
477 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
478 """Use the returned K closest nodes to store the key at."""
480 def _storedValueHandler(key, value, sender):
481 """Default callback that does nothing."""
483 response = _storedValueHandler
484 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
485 reactor.callLater(0, action.goWithNodes, nodes)
487 # First find the K closest nodes to operate on.
488 self.findNode(key, _storeValueForKey)
491 def krpc_store_value(self, key, value, token, id, _krpc_sender):
492 """Store the value locally with the key.
495 @param key: the target key to store the value for
496 @type value: C{string}
497 @param value: the value to store with the key
498 @param token: the token to confirm that this peer contacted us previously
500 @param id: the node ID of the sender node
501 @type _krpc_sender: (C{string}, C{int})
502 @param _krpc_sender: the sender node's IP address and port
504 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
505 self.insertNode(n, contacted = False)
506 for secret in self.token_secrets:
507 this_token = sha(secret + _krpc_sender[0]).digest()
508 if token == this_token:
509 self.store.storeValue(key, value)
510 return {"id" : self.node.id}
511 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
514 class Khashmir(KhashmirWrite):
515 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
519 class SimpleTests(unittest.TestCase):
522 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
523 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
524 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
526 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
527 'KEY_EXPIRE': 3600, 'SPEW': False, }
530 d = self.DHT_DEFAULTS.copy()
533 d = self.DHT_DEFAULTS.copy()
540 os.unlink(self.a.store.db)
541 os.unlink(self.b.store.db)
543 def testAddContact(self):
544 self.failUnlessEqual(len(self.a.table.buckets), 1)
545 self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
547 self.failUnlessEqual(len(self.b.table.buckets), 1)
548 self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
550 self.a.addContact('127.0.0.1', 4045)
556 self.failUnlessEqual(len(self.a.table.buckets), 1)
557 self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
558 self.failUnlessEqual(len(self.b.table.buckets), 1)
559 self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
561 def testStoreRetrieve(self):
562 self.a.addContact('127.0.0.1', 4045)
568 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
575 self.a.valueForKey(sha('foo').digest(), self._cb)
584 def _cb(self, key, val):
586 self.failUnlessEqual(self.got, 1)
587 elif 'foobar' in val:
591 class MultiTest(unittest.TestCase):
595 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
596 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
597 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
599 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
600 'KEY_EXPIRE': 3600, 'SPEW': False, }
602 def _done(self, val):
607 self.startport = 4088
608 for i in range(self.num):
609 d = self.DHT_DEFAULTS.copy()
610 d['PORT'] = self.startport + i
611 self.l.append(Khashmir(d))
616 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
617 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
618 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
625 i.findCloseNodes(self._done)
630 i.findCloseNodes(self._done)
637 os.unlink(i.store.db)
641 def testStoreRetrieve(self):
648 def _scb(key, value, result):
650 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
658 self.failUnlessEqual(self.got, 1)
664 self.l[randrange(0, self.num)].valueForKey(K, _rcb)