1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """The main Khashmir program."""
7 warnings.simplefilter("ignore", DeprecationWarning)
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
14 from twisted.internet.defer import Deferred
15 from twisted.internet import protocol, reactor
16 from twisted.trial import unittest
19 from ktable import KTable
20 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
21 from khash import newID, newIDInRange
22 from actions import FindNode, FindValue, GetValue, StoreValue
23 from stats import StatsLogger
26 class KhashmirBase(protocol.Factory):
27 """The base Khashmir class, with base functionality and find node, no key-value mappings.
29 @type _Node: L{node.Node}
30 @ivar _Node: the knode implementation to use for this class of DHT
31 @type config: C{dictionary}
32 @ivar config: the configuration parameters for the DHT
34 @ivar port: the port to listen on
36 @ivar store: the database to store nodes and key/value pairs in
37 @type node: L{node.Node}
39 @type table: L{ktable.KTable}
40 @ivar table: the routing table
41 @type token_secrets: C{list} of C{string}
42 @ivar token_secrets: the current secrets to use to create tokens
43 @type udp: L{krpc.hostbroker}
44 @ivar udp: the factory for the KRPC protocol
45 @type listenport: L{twisted.internet.interfaces.IListeningPort}
46 @ivar listenport: the UDP listening port
47 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
48 @ivar next_checkpoint: the delayed call for the next checkpoint
53 def __init__(self, config, cache_dir='/tmp'):
54 """Initialize the Khashmir class and call the L{setup} method.
56 @type config: C{dictionary}
57 @param config: the configuration parameters for the DHT
58 @type cache_dir: C{string}
59 @param cache_dir: the directory to store all files in
60 (optional, defaults to the /tmp directory)
63 self.setup(config, cache_dir)
65 def setup(self, config, cache_dir):
66 """Setup all the Khashmir sub-modules.
68 @type config: C{dictionary}
69 @param config: the configuration parameters for the DHT
70 @type cache_dir: C{string}
71 @param cache_dir: the directory to store all files in
74 self.port = config['PORT']
75 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
76 self.node = self._loadSelfNode('', self.port)
77 self.table = KTable(self.node, config)
78 self.token_secrets = [newID()]
79 self.stats = StatsLogger(self.table, self.store, self.config)
82 self.udp = krpc.hostbroker(self, self.stats, config)
83 self.udp.protocol = krpc.KRPC
84 self.listenport = reactor.listenUDP(self.port, self.udp)
86 # Load the routing table and begin checkpointing
87 self._loadRoutingTable()
88 self.refreshTable(force = True)
89 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
91 def Node(self, id, host = None, port = None):
94 @see: L{node.Node.__init__}
96 n = self._Node(id, host, port)
98 n.conn = self.udp.connectionForAddr((n.host, n.port))
102 """Stop listening for packets."""
103 self.listenport.stopListening()
105 def _loadSelfNode(self, host, port):
106 """Create this node, loading any previously saved one."""
107 id = self.store.getSelfNode()
110 return self._Node(id, host, port)
112 def checkpoint(self):
113 """Perform some periodic maintenance operations."""
114 # Create a new token secret
115 self.token_secrets.insert(0, newID())
116 if len(self.token_secrets) > 3:
117 self.token_secrets.pop()
119 # Save some parameters for reloading
120 self.store.saveSelfNode(self.node.id)
121 self.store.dumpRoutingTable(self.table.buckets)
124 self.store.expireValues(self.config['KEY_EXPIRE'])
127 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
128 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
131 def _loadRoutingTable(self):
132 """Load the previous routing table nodes from the database.
134 It's usually a good idea to call refreshTable(force = True) after
137 nodes = self.store.getRoutingTable()
139 n = self.Node(rec[0], rec[1], int(rec[2]))
140 self.table.insertNode(n, contacted = False)
143 def addContact(self, host, port, callback=None, errback=None):
144 """Ping this node and add the contact info to the table on pong.
146 @type host: C{string}
147 @param host: the IP address of the node to contact
149 @param port:the port of the node to contact
150 @type callback: C{method}
151 @param callback: the method to call with the results, it must take 1
152 parameter, the contact info returned by the node
153 (optional, defaults to doing nothing with the results)
154 @type errback: C{method}
155 @param errback: the method to call if an error occurs
156 (optional, defaults to calling the callback with None)
158 n = self.Node(NULL_ID, host, port)
159 self.sendJoin(n, callback=callback, errback=errback)
161 def findNode(self, id, callback, errback=None):
162 """Find the contact info for the K closest nodes in the global table.
165 @param id: the target ID to find the K closest nodes of
166 @type callback: C{method}
167 @param callback: the method to call with the results, it must take 1
168 parameter, the list of K closest nodes
169 @type errback: C{method}
170 @param errback: the method to call if an error occurs
171 (optional, defaults to doing nothing when an error occurs)
173 # Get K nodes out of local table/cache
174 nodes = self.table.findNodes(id)
177 d.addCallbacks(callback, errback)
179 d.addCallback(callback)
181 # If the target ID was found
182 if len(nodes) == 1 and nodes[0].id == id:
185 # Start the finding nodes action
186 state = FindNode(self, id, d.callback, self.config)
187 reactor.callLater(0, state.goWithNodes, nodes)
189 def insertNode(self, node, contacted = True):
190 """Try to insert a node in our local table, pinging oldest contact if necessary.
192 If all you have is a host/port, then use L{addContact}, which calls this
193 method after receiving the PONG from the remote node. The reason for
194 the seperation is we can't insert a node into the table without its
195 node ID. That means of course the node passed into this method needs
196 to be a properly formed Node object with a valid ID.
198 @type node: L{node.Node}
199 @param node: the new node to try and insert
200 @type contacted: C{boolean}
201 @param contacted: whether the new node is known to be good, i.e.
202 responded to a request (optional, defaults to True)
204 old = self.table.insertNode(node, contacted=contacted)
205 if (old and old.id != self.node.id and
206 (datetime.now() - old.lastSeen) >
207 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
209 def _staleNodeHandler(oldnode = old, newnode = node):
210 """The pinged node never responded, so replace it."""
211 self.table.replaceStaleNode(oldnode, newnode)
213 def _notStaleNodeHandler(dict, old=old):
214 """Got a pong from the old node, so update it."""
216 if dict['id'] == old.id:
217 self.table.justSeenNode(old.id)
219 # Bucket is full, check to see if old node is still available
220 df = old.ping(self.node.id)
221 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
223 def sendJoin(self, node, callback=None, errback=None):
224 """Join the DHT by pinging a bootstrap node.
226 @type node: L{node.Node}
227 @param node: the node to send the join to
228 @type callback: C{method}
229 @param callback: the method to call with the results, it must take 1
230 parameter, the contact info returned by the node
231 (optional, defaults to doing nothing with the results)
232 @type errback: C{method}
233 @param errback: the method to call if an error occurs
234 (optional, defaults to calling the callback with None)
237 def _pongHandler(dict, node=node, self=self, callback=callback):
238 """Node responded properly, callback with response."""
239 n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
242 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
244 def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
245 """Error occurred, fail node and errback or callback with error."""
246 table.nodeFailed(node)
252 df = node.join(self.node.id)
253 df.addCallbacks(_pongHandler, _defaultPong)
255 def findCloseNodes(self, callback=lambda a: None, errback = None):
256 """Perform a findNode on the ID one away from our own.
258 This will allow us to populate our table with nodes on our network
259 closest to our own. This is called as soon as we start up with an
262 @type callback: C{method}
263 @param callback: the method to call with the results, it must take 1
264 parameter, the list of K closest nodes
265 (optional, defaults to doing nothing with the results)
266 @type errback: C{method}
267 @param errback: the method to call if an error occurs
268 (optional, defaults to doing nothing when an error occurs)
270 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
271 self.findNode(id, callback, errback)
273 def refreshTable(self, force = False):
274 """Check all the buckets for those that need refreshing.
276 @param force: refresh all buckets regardless of last bucket access time
277 (optional, defaults to False)
282 for bucket in self.table.buckets:
283 if force or (datetime.now() - bucket.lastAccessed >
284 timedelta(seconds=self.config['BUCKET_STALENESS'])):
285 # Choose a random ID in the bucket and try and find it
286 id = newIDInRange(bucket.min, bucket.max)
287 self.findNode(id, callback)
290 """Closes the port and cancels pending later calls."""
291 self.listenport.stopListening()
293 self.next_checkpoint.cancel()
299 def krpc_ping(self, id, _krpc_sender):
303 @param id: the node ID of the sender node
304 @type _krpc_sender: (C{string}, C{int})
305 @param _krpc_sender: the sender node's IP address and port
307 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
308 self.insertNode(n, contacted = False)
310 return {"id" : self.node.id}
312 def krpc_join(self, id, _krpc_sender):
313 """Add the node by responding with its address and port.
316 @param id: the node ID of the sender node
317 @type _krpc_sender: (C{string}, C{int})
318 @param _krpc_sender: the sender node's IP address and port
320 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
321 self.insertNode(n, contacted = False)
323 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
325 def krpc_find_node(self, target, id, _krpc_sender):
326 """Find the K closest nodes to the target in the local routing table.
328 @type target: C{string}
329 @param target: the target ID to find nodes for
331 @param id: the node ID of the sender node
332 @type _krpc_sender: (C{string}, C{int})
333 @param _krpc_sender: the sender node's IP address and port
335 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
336 self.insertNode(n, contacted = False)
338 nodes = self.table.findNodes(target)
339 nodes = map(lambda node: node.contactInfo(), nodes)
340 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
341 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
344 class KhashmirRead(KhashmirBase):
345 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
350 def findValue(self, key, callback, errback=None):
351 """Get the nodes that have values for the key from the global table.
354 @param key: the target key to find the values for
355 @type callback: C{method}
356 @param callback: the method to call with the results, it must take 1
357 parameter, the list of nodes with values
358 @type errback: C{method}
359 @param errback: the method to call if an error occurs
360 (optional, defaults to doing nothing when an error occurs)
362 # Get K nodes out of local table/cache
363 nodes = self.table.findNodes(key)
366 d.addCallbacks(callback, errback)
368 d.addCallback(callback)
370 # Search for others starting with the locally found ones
371 state = FindValue(self, key, d.callback, self.config)
372 reactor.callLater(0, state.goWithNodes, nodes)
374 def valueForKey(self, key, callback, searchlocal = True):
375 """Get the values found for key in global table.
377 Callback will be called with a list of values for each peer that
378 returns unique values. The final callback will be an empty list.
381 @param key: the target key to get the values for
382 @type callback: C{method}
383 @param callback: the method to call with the results, it must take 2
384 parameters: the key, and the values found
385 @type searchlocal: C{boolean}
386 @param searchlocal: whether to also look for any local values
388 # Get any local values
390 l = self.store.retrieveValues(key)
392 reactor.callLater(0, callback, key, l)
396 def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
397 """Use the found nodes to send requests for values to."""
398 state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
399 reactor.callLater(0, state.goWithNodes, nodes)
401 # First lookup nodes that have values for the key
402 self.findValue(key, _getValueForKey)
405 def krpc_find_value(self, key, id, _krpc_sender):
406 """Find the number of values stored locally for the key, and the K closest nodes.
409 @param key: the target key to find the values and nodes for
411 @param id: the node ID of the sender node
412 @type _krpc_sender: (C{string}, C{int})
413 @param _krpc_sender: the sender node's IP address and port
415 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
416 self.insertNode(n, contacted = False)
418 nodes = self.table.findNodes(key)
419 nodes = map(lambda node: node.contactInfo(), nodes)
420 num_values = self.store.countValues(key)
421 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
423 def krpc_get_value(self, key, num, id, _krpc_sender):
424 """Retrieve the values stored locally for the key.
427 @param key: the target key to retrieve the values for
429 @param num: the maximum number of values to retrieve, or 0 to
432 @param id: the node ID of the sender node
433 @type _krpc_sender: (C{string}, C{int})
434 @param _krpc_sender: the sender node's IP address and port
436 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
437 self.insertNode(n, contacted = False)
439 l = self.store.retrieveValues(key)
440 if num == 0 or num >= len(l):
441 return {'values' : l, "id": self.node.id}
444 return {'values' : l[:num], "id": self.node.id}
447 class KhashmirWrite(KhashmirRead):
448 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
453 def storeValueForKey(self, key, value, callback=None):
454 """Stores the value for the key in the global table.
456 No status in this implementation, peers respond but don't indicate
457 status of storing values.
460 @param key: the target key to store the value for
461 @type value: C{string}
462 @param value: the value to store with the key
463 @type callback: C{method}
464 @param callback: the method to call with the results, it must take 3
465 parameters: the key, the value stored, and the result of the store
466 (optional, defaults to doing nothing with the results)
468 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
469 """Use the returned K closest nodes to store the key at."""
471 def _storedValueHandler(key, value, sender):
472 """Default callback that does nothing."""
474 response = _storedValueHandler
475 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
476 reactor.callLater(0, action.goWithNodes, nodes)
478 # First find the K closest nodes to operate on.
479 self.findNode(key, _storeValueForKey)
482 def krpc_store_value(self, key, value, token, id, _krpc_sender):
483 """Store the value locally with the key.
486 @param key: the target key to store the value for
487 @type value: C{string}
488 @param value: the value to store with the key
489 @param token: the token to confirm that this peer contacted us previously
491 @param id: the node ID of the sender node
492 @type _krpc_sender: (C{string}, C{int})
493 @param _krpc_sender: the sender node's IP address and port
495 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
496 self.insertNode(n, contacted = False)
497 for secret in self.token_secrets:
498 this_token = sha(secret + _krpc_sender[0]).digest()
499 if token == this_token:
500 self.store.storeValue(key, value)
501 return {"id" : self.node.id}
502 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
505 class Khashmir(KhashmirWrite):
506 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
510 class SimpleTests(unittest.TestCase):
513 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
514 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
515 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
517 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
518 'KEY_EXPIRE': 3600, 'SPEW': False, }
521 d = self.DHT_DEFAULTS.copy()
524 d = self.DHT_DEFAULTS.copy()
531 os.unlink(self.a.store.db)
532 os.unlink(self.b.store.db)
534 def testAddContact(self):
535 self.failUnlessEqual(len(self.a.table.buckets), 1)
536 self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
538 self.failUnlessEqual(len(self.b.table.buckets), 1)
539 self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
541 self.a.addContact('127.0.0.1', 4045)
547 self.failUnlessEqual(len(self.a.table.buckets), 1)
548 self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
549 self.failUnlessEqual(len(self.b.table.buckets), 1)
550 self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
552 def testStoreRetrieve(self):
553 self.a.addContact('127.0.0.1', 4045)
559 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
566 self.a.valueForKey(sha('foo').digest(), self._cb)
575 def _cb(self, key, val):
577 self.failUnlessEqual(self.got, 1)
578 elif 'foobar' in val:
582 class MultiTest(unittest.TestCase):
586 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
587 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
588 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
590 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
591 'KEY_EXPIRE': 3600, 'SPEW': False, }
593 def _done(self, val):
598 self.startport = 4088
599 for i in range(self.num):
600 d = self.DHT_DEFAULTS.copy()
601 d['PORT'] = self.startport + i
602 self.l.append(Khashmir(d))
607 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
608 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
609 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
616 i.findCloseNodes(self._done)
621 i.findCloseNodes(self._done)
628 os.unlink(i.store.db)
632 def testStoreRetrieve(self):
639 def _scb(key, value, result):
641 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
649 self.failUnlessEqual(self.got, 1)
655 self.l[randrange(0, self.num)].valueForKey(K, _rcb)