2 """The main Khashmir program.
4 @var isLocal: a compiled regular expression suitable for testing if an
5 IP address is from a known local or private range
9 warnings.simplefilter("ignore", DeprecationWarning)
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
17 from twisted.internet.defer import Deferred
18 from twisted.internet.base import DelayedCall
19 from twisted.internet import protocol, reactor
20 from twisted.python import log
21 from twisted.trial import unittest
24 from ktable import KTable
25 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
26 from khash import newID, newIDInRange
27 from actions import FindNode, FindValue, GetValue, StoreValue
28 from stats import StatsLogger
31 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
32 '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
33 '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34 '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35 '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
36 '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
38 class KhashmirBase(protocol.Factory):
39 """The base Khashmir class, with base functionality and find node, no key-value mappings.
41 @type _Node: L{node.Node}
42 @ivar _Node: the knode implementation to use for this class of DHT
43 @type config: C{dictionary}
44 @ivar config: the configuration parameters for the DHT
45 @type pinging: C{dictionary}
46 @ivar pinging: the node's that are currently being pinged, keys are the
47 node id's, values are the Deferred or DelayedCall objects
49 @ivar port: the port to listen on
51 @ivar store: the database to store nodes and key/value pairs in
52 @type node: L{node.Node}
54 @type table: L{ktable.KTable}
55 @ivar table: the routing table
56 @type token_secrets: C{list} of C{string}
57 @ivar token_secrets: the current secrets to use to create tokens
58 @type stats: L{stats.StatsLogger}
59 @ivar stats: the statistics gatherer
60 @type udp: L{krpc.hostbroker}
61 @ivar udp: the factory for the KRPC protocol
62 @type listenport: L{twisted.internet.interfaces.IListeningPort}
63 @ivar listenport: the UDP listening port
64 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
65 @ivar next_checkpoint: the delayed call for the next checkpoint
70 def __init__(self, config, cache_dir='/tmp'):
71 """Initialize the Khashmir class and call the L{setup} method.
73 @type config: C{dictionary}
74 @param config: the configuration parameters for the DHT
75 @type cache_dir: C{string}
76 @param cache_dir: the directory to store all files in
77 (optional, defaults to the /tmp directory)
81 self.setup(config, cache_dir)
83 def setup(self, config, cache_dir):
84 """Setup all the Khashmir sub-modules.
86 @type config: C{dictionary}
87 @param config: the configuration parameters for the DHT
88 @type cache_dir: C{string}
89 @param cache_dir: the directory to store all files in
92 self.port = config['PORT']
93 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
94 self.node = self._loadSelfNode('', self.port)
95 self.table = KTable(self.node, config)
96 self.token_secrets = [newID()]
97 self.stats = StatsLogger(self.table, self.store)
100 self.udp = krpc.hostbroker(self, self.stats, config)
101 self.udp.protocol = krpc.KRPC
102 self.listenport = reactor.listenUDP(self.port, self.udp)
104 # Load the routing table and begin checkpointing
105 self._loadRoutingTable()
106 self.refreshTable(force = True)
107 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
109 def Node(self, id, host = None, port = None):
110 """Create a new node.
112 @see: L{node.Node.__init__}
114 n = self._Node(id, host, port)
116 n.conn = self.udp.connectionForAddr((n.host, n.port))
120 """Stop listening for packets."""
121 self.listenport.stopListening()
123 def _loadSelfNode(self, host, port):
124 """Create this node, loading any previously saved one."""
125 id = self.store.getSelfNode()
128 return self._Node(id, host, port)
130 def checkpoint(self):
131 """Perform some periodic maintenance operations."""
132 # Create a new token secret
133 self.token_secrets.insert(0, newID())
134 if len(self.token_secrets) > 3:
135 self.token_secrets.pop()
137 # Save some parameters for reloading
138 self.store.saveSelfNode(self.node.id)
139 self.store.dumpRoutingTable(self.table.buckets)
142 self.store.expireValues(self.config['KEY_EXPIRE'])
145 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
146 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
149 def _loadRoutingTable(self):
150 """Load the previous routing table nodes from the database.
152 It's usually a good idea to call refreshTable(force = True) after
155 nodes = self.store.getRoutingTable()
157 n = self.Node(rec[0], rec[1], int(rec[2]))
158 self.table.insertNode(n, contacted = False)
161 def addContact(self, host, port, callback=None, errback=None):
162 """Ping this node and add the contact info to the table on pong.
164 @type host: C{string}
165 @param host: the IP address of the node to contact
167 @param port:the port of the node to contact
168 @type callback: C{method}
169 @param callback: the method to call with the results, it must take 1
170 parameter, the contact info returned by the node
171 (optional, defaults to doing nothing with the results)
172 @type errback: C{method}
173 @param errback: the method to call if an error occurs
174 (optional, defaults to calling the callback with the error)
176 n = self.Node(NULL_ID, host, port)
177 self.sendJoin(n, callback=callback, errback=errback)
179 def findNode(self, id, callback):
180 """Find the contact info for the K closest nodes in the global table.
183 @param id: the target ID to find the K closest nodes of
184 @type callback: C{method}
185 @param callback: the method to call with the results, it must take 1
186 parameter, the list of K closest nodes
188 # Mark the bucket as having been accessed
191 # Start with our node
192 nodes = [copy(self.node)]
194 # Start the finding nodes action
195 state = FindNode(self, id, callback, self.config, self.stats)
196 reactor.callLater(0, state.goWithNodes, nodes)
198 def insertNode(self, node, contacted = True):
199 """Try to insert a node in our local table, pinging oldest contact if necessary.
201 If all you have is a host/port, then use L{addContact}, which calls this
202 method after receiving the PONG from the remote node. The reason for
203 the separation is we can't insert a node into the table without its
204 node ID. That means of course the node passed into this method needs
205 to be a properly formed Node object with a valid ID.
207 @type node: L{node.Node}
208 @param node: the new node to try and insert
209 @type contacted: C{boolean}
210 @param contacted: whether the new node is known to be good, i.e.
211 responded to a request (optional, defaults to True)
213 # Don't add any local nodes to the routing table
214 if not self.config['LOCAL_OK'] and isLocal.match(node.host):
215 log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
218 old = self.table.insertNode(node, contacted=contacted)
220 if (isinstance(old, self._Node) and old.id != self.node.id and
221 (datetime.now() - old.lastSeen) >
222 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
224 # Bucket is full, check to see if old node is still available
225 df = self.sendPing(old)
226 df.addErrback(self._staleNodeHandler, old, node, contacted)
227 elif not old and not contacted:
228 # There's room, we just need to contact the node first
231 def _staleNodeHandler(self, err, old, node, contacted):
232 """The pinged node never responded, so replace it."""
233 self.table.invalidateNode(old)
234 self.insertNode(node, contacted)
237 def nodeFailed(self, node):
238 """Mark a node as having failed a request and schedule a future check.
240 @type node: L{node.Node}
241 @param node: the new node to try and insert
243 exists = self.table.nodeFailed(node)
245 # If in the table, schedule a ping, if one isn't already sent/scheduled
246 if exists and node.id not in self.pinging:
247 self.pinging[node,id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
250 def sendPing(self, node):
251 """Ping the node to see if it's still alive.
253 @type node: L{node.Node}
254 @param node: the node to send the join to
256 # Check for a ping already underway
257 if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
258 self.pinging[node.id].active()):
259 self.pinging[node.id].cancel()
260 elif isinstance(self.pinging.get(node.id, None), Deferred):
261 return self.pinging[node.id]
263 self.stats.startedAction('ping')
264 df = node.ping(self.node.id)
265 self.pinging[node.id] = df
266 df.addCallbacks(self._pingHandler, self._pingError,
267 callbackArgs = (node, datetime.now()),
268 errbackArgs = (node, datetime.now()))
271 def _pingHandler(self, dict, node, start):
272 """Node responded properly, update it and return the node object."""
273 self.stats.completedAction('ping', start)
274 del self.pinging[node.id]
275 # Create the node using the returned contact info
276 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
277 reactor.callLater(0, self.insertNode, n)
280 def _pingError(self, err, node, start):
281 """Error occurred, fail node."""
282 log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
283 self.stats.completedAction('ping', start)
284 del self.pinging[node.id]
285 self.nodeFailed(node)
288 def sendJoin(self, node, callback=None, errback=None):
289 """Join the DHT by pinging a bootstrap node.
291 @type node: L{node.Node}
292 @param node: the node to send the join to
293 @type callback: C{method}
294 @param callback: the method to call with the results, it must take 1
295 parameter, the contact info returned by the node
296 (optional, defaults to doing nothing with the results)
297 @type errback: C{method}
298 @param errback: the method to call if an error occurs
299 (optional, defaults to calling the callback with the error)
303 self.stats.startedAction('join')
304 df = node.join(self.node.id)
305 df.addCallbacks(self._joinHandler, self._joinError,
306 callbackArgs = (node, datetime.now()),
307 errbackArgs = (node, datetime.now()))
309 df.addCallbacks(callback, errback)
311 def _joinHandler(self, dict, node, start):
312 """Node responded properly, extract the response."""
313 self.stats.completedAction('join', start)
314 # Create the node using the returned contact info
315 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
316 reactor.callLater(0, self.insertNode, n)
317 return (dict['ip_addr'], dict['port'])
319 def _joinError(self, err, node, start):
320 """Error occurred, fail node."""
321 log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
322 self.stats.completedAction('join', start)
323 self.nodeFailed(node)
326 def findCloseNodes(self, callback=lambda a: None):
327 """Perform a findNode on the ID one away from our own.
329 This will allow us to populate our table with nodes on our network
330 closest to our own. This is called as soon as we start up with an
333 @type callback: C{method}
334 @param callback: the method to call with the results, it must take 1
335 parameter, the list of K closest nodes
336 (optional, defaults to doing nothing with the results)
338 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
339 self.findNode(id, callback)
341 def refreshTable(self, force = False):
342 """Check all the buckets for those that need refreshing.
344 @param force: refresh all buckets regardless of last bucket access time
345 (optional, defaults to False)
350 for bucket in self.table.buckets:
351 if force or (datetime.now() - bucket.lastAccessed >
352 timedelta(seconds=self.config['BUCKET_STALENESS'])):
353 # Choose a random ID in the bucket and try and find it
354 id = newIDInRange(bucket.min, bucket.max)
355 self.findNode(id, callback)
358 """Closes the port and cancels pending later calls."""
359 self.listenport.stopListening()
361 self.next_checkpoint.cancel()
364 for call in self.pinging:
365 if isinstance(call, DelayedCall) and call.active():
370 """Gather the statistics for the DHT."""
371 return self.stats.formatHTML()
374 def krpc_ping(self, id, _krpc_sender = None):
378 @param id: the node ID of the sender node
379 @type _krpc_sender: (C{string}, C{int})
380 @param _krpc_sender: the sender node's IP address and port
382 if _krpc_sender is not None:
383 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
384 reactor.callLater(0, self.insertNode, n, False)
386 return {"id" : self.node.id}
388 def krpc_join(self, id, _krpc_sender = None):
389 """Add the node by responding with its address and port.
392 @param id: the node ID of the sender node
393 @type _krpc_sender: (C{string}, C{int})
394 @param _krpc_sender: the sender node's IP address and port
396 if _krpc_sender is not None:
397 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
398 reactor.callLater(0, self.insertNode, n, False)
400 _krpc_sender = ('127.0.0.1', self.port)
402 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
404 def krpc_find_node(self, id, target, _krpc_sender = None):
405 """Find the K closest nodes to the target in the local routing table.
407 @type target: C{string}
408 @param target: the target ID to find nodes for
410 @param id: the node ID of the sender node
411 @type _krpc_sender: (C{string}, C{int})
412 @param _krpc_sender: the sender node's IP address and port
414 if _krpc_sender is not None:
415 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
416 reactor.callLater(0, self.insertNode, n, False)
418 _krpc_sender = ('127.0.0.1', self.port)
420 nodes = self.table.findNodes(target)
421 nodes = map(lambda node: node.contactInfo(), nodes)
422 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
423 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
426 class KhashmirRead(KhashmirBase):
427 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
432 def findValue(self, key, callback):
433 """Get the nodes that have values for the key from the global table.
436 @param key: the target key to find the values for
437 @type callback: C{method}
438 @param callback: the method to call with the results, it must take 1
439 parameter, the list of nodes with values
441 # Mark the bucket as having been accessed
442 self.table.touch(key)
445 nodes = [copy(self.node)]
447 # Search for others starting with the locally found ones
448 state = FindValue(self, key, callback, self.config, self.stats)
449 reactor.callLater(0, state.goWithNodes, nodes)
451 def valueForKey(self, key, callback, searchlocal = True):
452 """Get the values found for key in global table.
454 Callback will be called with a list of values for each peer that
455 returns unique values. The final callback will be an empty list.
458 @param key: the target key to get the values for
459 @type callback: C{method}
460 @param callback: the method to call with the results, it must take 2
461 parameters: the key, and the values found
462 @type searchlocal: C{boolean}
463 @param searchlocal: whether to also look for any local values
466 def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
467 """Use the found nodes to send requests for values to."""
468 # Get any local values
470 l = self.store.retrieveValues(key)
472 node = copy(self.node)
473 node.updateNumValues(len(l))
474 nodes = nodes + [node]
476 state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
477 reactor.callLater(0, state.goWithNodes, nodes)
479 # First lookup nodes that have values for the key
480 self.findValue(key, _getValueForKey)
483 def krpc_find_value(self, id, key, _krpc_sender = None):
484 """Find the number of values stored locally for the key, and the K closest nodes.
487 @param key: the target key to find the values and nodes for
489 @param id: the node ID of the sender node
490 @type _krpc_sender: (C{string}, C{int})
491 @param _krpc_sender: the sender node's IP address and port
493 if _krpc_sender is not None:
494 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
495 reactor.callLater(0, self.insertNode, n, False)
497 nodes = self.table.findNodes(key)
498 nodes = map(lambda node: node.contactInfo(), nodes)
499 num_values = self.store.countValues(key)
500 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
502 def krpc_get_value(self, id, key, num, _krpc_sender = None):
503 """Retrieve the values stored locally for the key.
506 @param key: the target key to retrieve the values for
508 @param num: the maximum number of values to retrieve, or 0 to
511 @param id: the node ID of the sender node
512 @type _krpc_sender: (C{string}, C{int})
513 @param _krpc_sender: the sender node's IP address and port
515 if _krpc_sender is not None:
516 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
517 reactor.callLater(0, self.insertNode, n, False)
519 l = self.store.retrieveValues(key)
520 if num == 0 or num >= len(l):
521 return {'values' : l, "id": self.node.id}
524 return {'values' : l[:num], "id": self.node.id}
527 class KhashmirWrite(KhashmirRead):
528 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
533 def storeValueForKey(self, key, value, callback=None):
534 """Stores the value for the key in the global table.
536 No status in this implementation, peers respond but don't indicate
537 status of storing values.
540 @param key: the target key to store the value for
541 @type value: C{string}
542 @param value: the value to store with the key
543 @type callback: C{method}
544 @param callback: the method to call with the results, it must take 3
545 parameters: the key, the value stored, and the result of the store
546 (optional, defaults to doing nothing with the results)
548 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
549 """Use the returned K closest nodes to store the key at."""
551 def _storedValueHandler(key, value, sender):
552 """Default callback that does nothing."""
554 response = _storedValueHandler
555 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
556 reactor.callLater(0, action.goWithNodes, nodes)
558 # First find the K closest nodes to operate on.
559 self.findNode(key, _storeValueForKey)
562 def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
563 """Store the value locally with the key.
566 @param key: the target key to store the value for
567 @type value: C{string}
568 @param value: the value to store with the key
569 @param token: the token to confirm that this peer contacted us previously
571 @param id: the node ID of the sender node
572 @type _krpc_sender: (C{string}, C{int})
573 @param _krpc_sender: the sender node's IP address and port
575 if _krpc_sender is not None:
576 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
577 reactor.callLater(0, self.insertNode, n, False)
579 _krpc_sender = ('127.0.0.1', self.port)
581 for secret in self.token_secrets:
582 this_token = sha(secret + _krpc_sender[0]).digest()
583 if token == this_token:
584 self.store.storeValue(key, value)
585 return {"id" : self.node.id}
586 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
589 class Khashmir(KhashmirWrite):
590 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
594 class SimpleTests(unittest.TestCase):
597 DHT_DEFAULTS = {'PORT': 9977,
598 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
599 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
600 'MAX_FAILURES': 3, 'LOCAL_OK': True,
601 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
602 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
603 'KEY_EXPIRE': 3600, 'SPEW': True, }
606 d = self.DHT_DEFAULTS.copy()
609 d = self.DHT_DEFAULTS.copy()
616 os.unlink(self.a.store.db)
617 os.unlink(self.b.store.db)
619 def testAddContact(self):
620 self.failUnlessEqual(len(self.a.table.buckets), 1)
621 self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
623 self.failUnlessEqual(len(self.b.table.buckets), 1)
624 self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
626 self.a.addContact('127.0.0.1', 4045)
636 self.failUnlessEqual(len(self.a.table.buckets), 1)
637 self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
638 self.failUnlessEqual(len(self.b.table.buckets), 1)
639 self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
641 def testStoreRetrieve(self):
642 self.a.addContact('127.0.0.1', 4045)
648 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
655 self.a.valueForKey(sha('foo').digest(), self._cb)
678 def _cb(self, key, val):
680 self.failUnlessEqual(self.got, 1)
681 elif 'foobar' in val:
685 class MultiTest(unittest.TestCase):
689 DHT_DEFAULTS = {'PORT': 9977,
690 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
691 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
692 'MAX_FAILURES': 3, 'LOCAL_OK': True,
693 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
694 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
695 'KEY_EXPIRE': 3600, 'SPEW': True, }
697 def _done(self, val):
702 self.startport = 4088
703 for i in range(self.num):
704 d = self.DHT_DEFAULTS.copy()
705 d['PORT'] = self.startport + i
706 self.l.append(Khashmir(d))
711 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
712 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
713 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
723 i.findCloseNodes(self._done)
728 i.findCloseNodes(self._done)
735 os.unlink(i.store.db)
739 def testStoreRetrieve(self):
746 def _scb(key, value, result):
748 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
756 self.failUnlessEqual(self.got, 1)
762 self.l[randrange(0, self.num)].valueForKey(K, _rcb)