2 """The main Khashmir program.
4 @var isLocal: a compiled regular expression suitable for testing if an
5 IP address is from a known local or private range
9 warnings.simplefilter("ignore", DeprecationWarning)
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
17 from twisted.internet.defer import Deferred
18 from twisted.internet import protocol, reactor
19 from twisted.python import log
20 from twisted.trial import unittest
23 from ktable import KTable
24 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
25 from khash import newID, newIDInRange
26 from actions import FindNode, FindValue, GetValue, StoreValue
27 from stats import StatsLogger
30 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
31 '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
32 '(172\.0?([1][6-9])|([2][0-9])|([3][0-1])\.[0-9]{1,3}\.[0-9]{1,3})|'+
33 '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
35 class KhashmirBase(protocol.Factory):
36 """The base Khashmir class, with base functionality and find node, no key-value mappings.
38 @type _Node: L{node.Node}
39 @ivar _Node: the knode implementation to use for this class of DHT
40 @type config: C{dictionary}
41 @ivar config: the configuration parameters for the DHT
43 @ivar port: the port to listen on
45 @ivar store: the database to store nodes and key/value pairs in
46 @type node: L{node.Node}
48 @type table: L{ktable.KTable}
49 @ivar table: the routing table
50 @type token_secrets: C{list} of C{string}
51 @ivar token_secrets: the current secrets to use to create tokens
52 @type stats: L{stats.StatsLogger}
53 @ivar stats: the statistics gatherer
54 @type udp: L{krpc.hostbroker}
55 @ivar udp: the factory for the KRPC protocol
56 @type listenport: L{twisted.internet.interfaces.IListeningPort}
57 @ivar listenport: the UDP listening port
58 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
59 @ivar next_checkpoint: the delayed call for the next checkpoint
64 def __init__(self, config, cache_dir='/tmp'):
65 """Initialize the Khashmir class and call the L{setup} method.
67 @type config: C{dictionary}
68 @param config: the configuration parameters for the DHT
69 @type cache_dir: C{string}
70 @param cache_dir: the directory to store all files in
71 (optional, defaults to the /tmp directory)
74 self.setup(config, cache_dir)
76 def setup(self, config, cache_dir):
77 """Setup all the Khashmir sub-modules.
79 @type config: C{dictionary}
80 @param config: the configuration parameters for the DHT
81 @type cache_dir: C{string}
82 @param cache_dir: the directory to store all files in
85 self.port = config['PORT']
86 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
87 self.node = self._loadSelfNode('', self.port)
88 self.table = KTable(self.node, config)
89 self.token_secrets = [newID()]
90 self.stats = StatsLogger(self.table, self.store)
93 self.udp = krpc.hostbroker(self, self.stats, config)
94 self.udp.protocol = krpc.KRPC
95 self.listenport = reactor.listenUDP(self.port, self.udp)
97 # Load the routing table and begin checkpointing
98 self._loadRoutingTable()
99 self.refreshTable(force = True)
100 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
102 def Node(self, id, host = None, port = None):
103 """Create a new node.
105 @see: L{node.Node.__init__}
107 n = self._Node(id, host, port)
109 n.conn = self.udp.connectionForAddr((n.host, n.port))
113 """Stop listening for packets."""
114 self.listenport.stopListening()
116 def _loadSelfNode(self, host, port):
117 """Create this node, loading any previously saved one."""
118 id = self.store.getSelfNode()
121 return self._Node(id, host, port)
123 def checkpoint(self):
124 """Perform some periodic maintenance operations."""
125 # Create a new token secret
126 self.token_secrets.insert(0, newID())
127 if len(self.token_secrets) > 3:
128 self.token_secrets.pop()
130 # Save some parameters for reloading
131 self.store.saveSelfNode(self.node.id)
132 self.store.dumpRoutingTable(self.table.buckets)
135 self.store.expireValues(self.config['KEY_EXPIRE'])
138 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
139 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
142 def _loadRoutingTable(self):
143 """Load the previous routing table nodes from the database.
145 It's usually a good idea to call refreshTable(force = True) after
148 nodes = self.store.getRoutingTable()
150 n = self.Node(rec[0], rec[1], int(rec[2]))
151 self.table.insertNode(n, contacted = False)
154 def addContact(self, host, port, callback=None, errback=None):
155 """Ping this node and add the contact info to the table on pong.
157 @type host: C{string}
158 @param host: the IP address of the node to contact
160 @param port:the port of the node to contact
161 @type callback: C{method}
162 @param callback: the method to call with the results, it must take 1
163 parameter, the contact info returned by the node
164 (optional, defaults to doing nothing with the results)
165 @type errback: C{method}
166 @param errback: the method to call if an error occurs
167 (optional, defaults to calling the callback with the error)
169 n = self.Node(NULL_ID, host, port)
170 self.sendJoin(n, callback=callback, errback=errback)
172 def findNode(self, id, callback):
173 """Find the contact info for the K closest nodes in the global table.
176 @param id: the target ID to find the K closest nodes of
177 @type callback: C{method}
178 @param callback: the method to call with the results, it must take 1
179 parameter, the list of K closest nodes
181 # Start with our node
182 nodes = [copy(self.node)]
184 # Start the finding nodes action
185 state = FindNode(self, id, callback, self.config, self.stats)
186 reactor.callLater(0, state.goWithNodes, nodes)
188 def insertNode(self, node, contacted = True):
189 """Try to insert a node in our local table, pinging oldest contact if necessary.
191 If all you have is a host/port, then use L{addContact}, which calls this
192 method after receiving the PONG from the remote node. The reason for
193 the separation is we can't insert a node into the table without its
194 node ID. That means of course the node passed into this method needs
195 to be a properly formed Node object with a valid ID.
197 @type node: L{node.Node}
198 @param node: the new node to try and insert
199 @type contacted: C{boolean}
200 @param contacted: whether the new node is known to be good, i.e.
201 responded to a request (optional, defaults to True)
203 # Don't add any local nodes to the routing table
204 if not self.config['LOCAL_OK'] and isLocal.match(node.host):
205 log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
208 old = self.table.insertNode(node, contacted=contacted)
210 if (isinstance(old, self._Node) and old.id != self.node.id and
211 (datetime.now() - old.lastSeen) >
212 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
214 # Bucket is full, check to see if old node is still available
215 self.stats.startedAction('ping')
216 df = old.ping(self.node.id)
217 df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
218 callbackArgs = (old, datetime.now()),
219 errbackArgs = (old, datetime.now(), node, contacted))
220 elif not old and not contacted:
221 # There's room, we just need to contact the node first
222 self.stats.startedAction('ping')
223 df = node.ping(self.node.id)
224 # Convert the returned contact info into a node
225 df.addCallback(self._pongHandler, datetime.now())
226 # Try adding the contacted node
227 df.addCallbacks(self.insertNode, self._pongError,
228 errbackArgs = (node, datetime.now()))
230 def _freshNodeHandler(self, dict, old, start):
231 """Got a pong from the old node, so update it."""
232 self.stats.completedAction('ping', start)
233 if dict['id'] == old.id:
234 self.table.justSeenNode(old.id)
236 def _staleNodeHandler(self, err, old, start, node, contacted):
237 """The pinged node never responded, so replace it."""
238 log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
239 self.stats.completedAction('ping', start)
240 self.table.invalidateNode(old)
241 self.insertNode(node, contacted)
243 def _pongHandler(self, dict, start):
244 """Node responded properly, change response into a node to insert."""
245 self.stats.completedAction('ping', start)
246 # Create the node using the returned contact info
247 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
250 def _pongError(self, err, node, start):
251 """Error occurred, fail node and errback or callback with error."""
252 log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
253 self.stats.completedAction('ping', start)
254 self.table.nodeFailed(node)
256 def sendJoin(self, node, callback=None, errback=None):
257 """Join the DHT by pinging a bootstrap node.
259 @type node: L{node.Node}
260 @param node: the node to send the join to
261 @type callback: C{method}
262 @param callback: the method to call with the results, it must take 1
263 parameter, the contact info returned by the node
264 (optional, defaults to doing nothing with the results)
265 @type errback: C{method}
266 @param errback: the method to call if an error occurs
267 (optional, defaults to calling the callback with the error)
271 self.stats.startedAction('join')
272 df = node.join(self.node.id)
273 df.addCallbacks(self._joinHandler, self._joinError,
274 callbackArgs = (node, datetime.now()),
275 errbackArgs = (node, datetime.now()))
277 df.addCallbacks(callback, errback)
279 def _joinHandler(self, dict, node, start):
280 """Node responded properly, extract the response."""
281 self.stats.completedAction('join', start)
282 # Create the node using the returned contact info
283 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
284 reactor.callLater(0, self.insertNode, n)
285 return (dict['ip_addr'], dict['port'])
287 def _joinError(self, err, node, start):
288 """Error occurred, fail node."""
289 log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
290 self.stats.completedAction('join', start)
291 self.table.nodeFailed(node)
294 def findCloseNodes(self, callback=lambda a: None):
295 """Perform a findNode on the ID one away from our own.
297 This will allow us to populate our table with nodes on our network
298 closest to our own. This is called as soon as we start up with an
301 @type callback: C{method}
302 @param callback: the method to call with the results, it must take 1
303 parameter, the list of K closest nodes
304 (optional, defaults to doing nothing with the results)
306 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
307 self.findNode(id, callback)
309 def refreshTable(self, force = False):
310 """Check all the buckets for those that need refreshing.
312 @param force: refresh all buckets regardless of last bucket access time
313 (optional, defaults to False)
318 for bucket in self.table.buckets:
319 if force or (datetime.now() - bucket.lastAccessed >
320 timedelta(seconds=self.config['BUCKET_STALENESS'])):
321 # Choose a random ID in the bucket and try and find it
322 id = newIDInRange(bucket.min, bucket.max)
323 self.findNode(id, callback)
326 """Closes the port and cancels pending later calls."""
327 self.listenport.stopListening()
329 self.next_checkpoint.cancel()
335 """Gather the statistics for the DHT."""
336 return self.stats.formatHTML()
339 def krpc_ping(self, id, _krpc_sender = None):
343 @param id: the node ID of the sender node
344 @type _krpc_sender: (C{string}, C{int})
345 @param _krpc_sender: the sender node's IP address and port
347 if _krpc_sender is not None:
348 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
349 reactor.callLater(0, self.insertNode, n, False)
351 return {"id" : self.node.id}
353 def krpc_join(self, id, _krpc_sender = None):
354 """Add the node by responding with its address and port.
357 @param id: the node ID of the sender node
358 @type _krpc_sender: (C{string}, C{int})
359 @param _krpc_sender: the sender node's IP address and port
361 if _krpc_sender is not None:
362 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
363 reactor.callLater(0, self.insertNode, n, False)
365 _krpc_sender = ('127.0.0.1', self.port)
367 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
369 def krpc_find_node(self, id, target, _krpc_sender = None):
370 """Find the K closest nodes to the target in the local routing table.
372 @type target: C{string}
373 @param target: the target ID to find nodes for
375 @param id: the node ID of the sender node
376 @type _krpc_sender: (C{string}, C{int})
377 @param _krpc_sender: the sender node's IP address and port
379 if _krpc_sender is not None:
380 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
381 reactor.callLater(0, self.insertNode, n, False)
383 _krpc_sender = ('127.0.0.1', self.port)
385 nodes = self.table.findNodes(target)
386 nodes = map(lambda node: node.contactInfo(), nodes)
387 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
388 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
391 class KhashmirRead(KhashmirBase):
392 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
397 def findValue(self, key, callback):
398 """Get the nodes that have values for the key from the global table.
401 @param key: the target key to find the values for
402 @type callback: C{method}
403 @param callback: the method to call with the results, it must take 1
404 parameter, the list of nodes with values
407 nodes = [copy(self.node)]
409 # Search for others starting with the locally found ones
410 state = FindValue(self, key, callback, self.config, self.stats)
411 reactor.callLater(0, state.goWithNodes, nodes)
413 def valueForKey(self, key, callback, searchlocal = True):
414 """Get the values found for key in global table.
416 Callback will be called with a list of values for each peer that
417 returns unique values. The final callback will be an empty list.
420 @param key: the target key to get the values for
421 @type callback: C{method}
422 @param callback: the method to call with the results, it must take 2
423 parameters: the key, and the values found
424 @type searchlocal: C{boolean}
425 @param searchlocal: whether to also look for any local values
428 def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
429 """Use the found nodes to send requests for values to."""
430 # Get any local values
432 l = self.store.retrieveValues(key)
434 node = copy(self.node)
435 node.updateNumValues(len(l))
436 nodes = nodes + [node]
438 state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
439 reactor.callLater(0, state.goWithNodes, nodes)
441 # First lookup nodes that have values for the key
442 self.findValue(key, _getValueForKey)
445 def krpc_find_value(self, id, key, _krpc_sender = None):
446 """Find the number of values stored locally for the key, and the K closest nodes.
449 @param key: the target key to find the values and nodes for
451 @param id: the node ID of the sender node
452 @type _krpc_sender: (C{string}, C{int})
453 @param _krpc_sender: the sender node's IP address and port
455 if _krpc_sender is not None:
456 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
457 reactor.callLater(0, self.insertNode, n, False)
459 nodes = self.table.findNodes(key)
460 nodes = map(lambda node: node.contactInfo(), nodes)
461 num_values = self.store.countValues(key)
462 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
464 def krpc_get_value(self, id, key, num, _krpc_sender = None):
465 """Retrieve the values stored locally for the key.
468 @param key: the target key to retrieve the values for
470 @param num: the maximum number of values to retrieve, or 0 to
473 @param id: the node ID of the sender node
474 @type _krpc_sender: (C{string}, C{int})
475 @param _krpc_sender: the sender node's IP address and port
477 if _krpc_sender is not None:
478 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
479 reactor.callLater(0, self.insertNode, n, False)
481 l = self.store.retrieveValues(key)
482 if num == 0 or num >= len(l):
483 return {'values' : l, "id": self.node.id}
486 return {'values' : l[:num], "id": self.node.id}
489 class KhashmirWrite(KhashmirRead):
490 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
495 def storeValueForKey(self, key, value, callback=None):
496 """Stores the value for the key in the global table.
498 No status in this implementation, peers respond but don't indicate
499 status of storing values.
502 @param key: the target key to store the value for
503 @type value: C{string}
504 @param value: the value to store with the key
505 @type callback: C{method}
506 @param callback: the method to call with the results, it must take 3
507 parameters: the key, the value stored, and the result of the store
508 (optional, defaults to doing nothing with the results)
510 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
511 """Use the returned K closest nodes to store the key at."""
513 def _storedValueHandler(key, value, sender):
514 """Default callback that does nothing."""
516 response = _storedValueHandler
517 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
518 reactor.callLater(0, action.goWithNodes, nodes)
520 # First find the K closest nodes to operate on.
521 self.findNode(key, _storeValueForKey)
524 def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
525 """Store the value locally with the key.
528 @param key: the target key to store the value for
529 @type value: C{string}
530 @param value: the value to store with the key
531 @param token: the token to confirm that this peer contacted us previously
533 @param id: the node ID of the sender node
534 @type _krpc_sender: (C{string}, C{int})
535 @param _krpc_sender: the sender node's IP address and port
537 if _krpc_sender is not None:
538 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
539 reactor.callLater(0, self.insertNode, n, False)
541 _krpc_sender = ('127.0.0.1', self.port)
543 for secret in self.token_secrets:
544 this_token = sha(secret + _krpc_sender[0]).digest()
545 if token == this_token:
546 self.store.storeValue(key, value)
547 return {"id" : self.node.id}
548 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
551 class Khashmir(KhashmirWrite):
552 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
556 class SimpleTests(unittest.TestCase):
559 DHT_DEFAULTS = {'PORT': 9977,
560 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
561 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
562 'MAX_FAILURES': 3, 'LOCAL_OK': True,
563 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
564 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
565 'KEY_EXPIRE': 3600, 'SPEW': True, }
568 d = self.DHT_DEFAULTS.copy()
571 d = self.DHT_DEFAULTS.copy()
578 os.unlink(self.a.store.db)
579 os.unlink(self.b.store.db)
581 def testAddContact(self):
582 self.failUnlessEqual(len(self.a.table.buckets), 1)
583 self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
585 self.failUnlessEqual(len(self.b.table.buckets), 1)
586 self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
588 self.a.addContact('127.0.0.1', 4045)
594 self.failUnlessEqual(len(self.a.table.buckets), 1)
595 self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
596 self.failUnlessEqual(len(self.b.table.buckets), 1)
597 self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
599 def testStoreRetrieve(self):
600 self.a.addContact('127.0.0.1', 4045)
606 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
613 self.a.valueForKey(sha('foo').digest(), self._cb)
622 def _cb(self, key, val):
624 self.failUnlessEqual(self.got, 1)
625 elif 'foobar' in val:
629 class MultiTest(unittest.TestCase):
633 DHT_DEFAULTS = {'PORT': 9977,
634 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
635 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
636 'MAX_FAILURES': 3, 'LOCAL_OK': True,
637 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
638 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2,
639 'KEY_EXPIRE': 3600, 'SPEW': True, }
641 def _done(self, val):
646 self.startport = 4088
647 for i in range(self.num):
648 d = self.DHT_DEFAULTS.copy()
649 d['PORT'] = self.startport + i
650 self.l.append(Khashmir(d))
655 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
656 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
657 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
664 i.findCloseNodes(self._done)
669 i.findCloseNodes(self._done)
676 os.unlink(i.store.db)
680 def testStoreRetrieve(self):
687 def _scb(key, value, result):
689 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
697 self.failUnlessEqual(self.got, 1)
703 self.l[randrange(0, self.num)].valueForKey(K, _rcb)