2 """The main Khashmir program.
4 @var isLocal: a compiled regular expression suitable for testing if an
5 IP address is from a known local or private range
9 warnings.simplefilter("ignore", DeprecationWarning)
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
17 from twisted.internet.defer import Deferred
18 from twisted.internet.base import DelayedCall
19 from twisted.internet import protocol, reactor
20 from twisted.python import log
21 from twisted.trial import unittest
24 from ktable import KTable
25 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
26 from khash import newID, newIDInRange
27 from actions import FindNode, FindValue, GetValue, StoreValue
28 from stats import StatsLogger
31 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
32 '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
33 '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34 '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35 '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
36 '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
38 class KhashmirBase(protocol.Factory):
39 """The base Khashmir class, with base functionality and find node, no key-value mappings.
41 @type _Node: L{node.Node}
42 @ivar _Node: the knode implementation to use for this class of DHT
43 @type config: C{dictionary}
44 @ivar config: the configuration parameters for the DHT
45 @type pinging: C{dictionary}
46 @ivar pinging: the node's that are currently being pinged, keys are the
47 node id's, values are the Deferred or DelayedCall objects
49 @ivar port: the port to listen on
51 @ivar store: the database to store nodes and key/value pairs in
52 @type node: L{node.Node}
54 @type table: L{ktable.KTable}
55 @ivar table: the routing table
56 @type token_secrets: C{list} of C{string}
57 @ivar token_secrets: the current secrets to use to create tokens
58 @type stats: L{stats.StatsLogger}
59 @ivar stats: the statistics gatherer
60 @type udp: L{krpc.hostbroker}
61 @ivar udp: the factory for the KRPC protocol
62 @type listenport: L{twisted.internet.interfaces.IListeningPort}
63 @ivar listenport: the UDP listening port
64 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
65 @ivar next_checkpoint: the delayed call for the next checkpoint
70 def __init__(self, config, cache_dir='/tmp'):
71 """Initialize the Khashmir class and call the L{setup} method.
73 @type config: C{dictionary}
74 @param config: the configuration parameters for the DHT
75 @type cache_dir: C{string}
76 @param cache_dir: the directory to store all files in
77 (optional, defaults to the /tmp directory)
81 self.setup(config, cache_dir)
83 def setup(self, config, cache_dir):
84 """Setup all the Khashmir sub-modules.
86 @type config: C{dictionary}
87 @param config: the configuration parameters for the DHT
88 @type cache_dir: C{string}
89 @param cache_dir: the directory to store all files in
92 self.port = config['PORT']
93 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
94 self.node = self._loadSelfNode('', self.port)
95 self.table = KTable(self.node, config)
96 self.token_secrets = [newID()]
97 self.stats = StatsLogger(self.table, self.store)
100 self.udp = krpc.hostbroker(self, self.stats, config)
101 self.udp.protocol = krpc.KRPC
102 self.listenport = reactor.listenUDP(self.port, self.udp)
104 # Load the routing table and begin checkpointing
105 self._loadRoutingTable()
106 self.refreshTable(force = True)
107 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
109 def Node(self, id, host = None, port = None):
110 """Create a new node.
112 @see: L{node.Node.__init__}
114 n = self._Node(id, host, port)
116 n.conn = self.udp.connectionForAddr((n.host, n.port))
120 """Stop listening for packets."""
121 self.listenport.stopListening()
123 def _loadSelfNode(self, host, port):
124 """Create this node, loading any previously saved one."""
125 id = self.store.getSelfNode()
128 return self._Node(id, host, port)
130 def checkpoint(self):
131 """Perform some periodic maintenance operations."""
132 # Create a new token secret
133 self.token_secrets.insert(0, newID())
134 if len(self.token_secrets) > 3:
135 self.token_secrets.pop()
137 # Save some parameters for reloading
138 self.store.saveSelfNode(self.node.id)
139 self.store.dumpRoutingTable(self.table.buckets)
142 self.store.expireValues(self.config['KEY_EXPIRE'])
145 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
146 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
149 def _loadRoutingTable(self):
150 """Load the previous routing table nodes from the database.
152 It's usually a good idea to call refreshTable(force = True) after
155 nodes = self.store.getRoutingTable()
157 n = self.Node(rec[0], rec[1], int(rec[2]))
158 self.table.insertNode(n, contacted = False)
161 def addContact(self, host, port, callback=None, errback=None):
162 """Ping this node and add the contact info to the table on pong.
164 @type host: C{string}
165 @param host: the IP address of the node to contact
167 @param port:the port of the node to contact
168 @type callback: C{method}
169 @param callback: the method to call with the results, it must take 1
170 parameter, the contact info returned by the node
171 (optional, defaults to doing nothing with the results)
172 @type errback: C{method}
173 @param errback: the method to call if an error occurs
174 (optional, defaults to calling the callback with the error)
176 n = self.Node(NULL_ID, host, port)
177 self.sendJoin(n, callback=callback, errback=errback)
179 def findNode(self, id, callback):
180 """Find the contact info for the K closest nodes in the global table.
183 @param id: the target ID to find the K closest nodes of
184 @type callback: C{method}
185 @param callback: the method to call with the results, it must take 1
186 parameter, the list of K closest nodes
188 # Mark the bucket as having been accessed
191 # Start with our node
192 nodes = [copy(self.node)]
194 # Start the finding nodes action
195 state = FindNode(self, id, callback, self.config, self.stats)
196 reactor.callLater(0, state.goWithNodes, nodes)
198 def insertNode(self, node, contacted = True):
199 """Try to insert a node in our local table, pinging oldest contact if necessary.
201 If all you have is a host/port, then use L{addContact}, which calls this
202 method after receiving the PONG from the remote node. The reason for
203 the separation is we can't insert a node into the table without its
204 node ID. That means of course the node passed into this method needs
205 to be a properly formed Node object with a valid ID.
207 @type node: L{node.Node}
208 @param node: the new node to try and insert
209 @type contacted: C{boolean}
210 @param contacted: whether the new node is known to be good, i.e.
211 responded to a request (optional, defaults to True)
213 # Don't add any local nodes to the routing table
214 if not self.config['LOCAL_OK'] and isLocal.match(node.host):
215 log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
218 old = self.table.insertNode(node, contacted=contacted)
220 if (isinstance(old, self._Node) and old.id != self.node.id and
221 (datetime.now() - old.lastSeen) >
222 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
224 # Bucket is full, check to see if old node is still available
225 df = self.sendPing(old)
226 df.addErrback(self._staleNodeHandler, old, node, contacted)
227 elif not old and not contacted:
228 # There's room, we just need to contact the node first
229 df = self.sendPing(node)
230 # Also schedule a future ping to make sure the node works
231 def rePing(newnode, self = self):
232 if newnode.id not in self.pinging:
233 self.pinging[newnode.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
234 self.sendPing, newnode)
236 df.addCallback(rePing)
238 def _staleNodeHandler(self, err, old, node, contacted):
239 """The pinged node never responded, so replace it."""
240 self.table.invalidateNode(old)
241 self.insertNode(node, contacted)
244 def nodeFailed(self, node):
245 """Mark a node as having failed a request and schedule a future check.
247 @type node: L{node.Node}
248 @param node: the new node to try and insert
250 exists = self.table.nodeFailed(node)
252 # If in the table, schedule a ping, if one isn't already sent/scheduled
253 if exists and node.id not in self.pinging:
254 self.pinging[node.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
257 def sendPing(self, node):
258 """Ping the node to see if it's still alive.
260 @type node: L{node.Node}
261 @param node: the node to send the join to
263 # Check for a ping already underway
264 if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
265 self.pinging[node.id].active()):
266 self.pinging[node.id].cancel()
267 elif isinstance(self.pinging.get(node.id, None), Deferred):
268 return self.pinging[node.id]
270 self.stats.startedAction('ping')
271 df = node.ping(self.node.id)
272 self.pinging[node.id] = df
273 df.addCallbacks(self._pingHandler, self._pingError,
274 callbackArgs = (node, datetime.now()),
275 errbackArgs = (node, datetime.now()))
278 def _pingHandler(self, dict, node, start):
279 """Node responded properly, update it and return the node object."""
280 self.stats.completedAction('ping', start)
281 del self.pinging[node.id]
282 # Create the node using the returned contact info
283 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
284 reactor.callLater(0, self.insertNode, n)
287 def _pingError(self, err, node, start):
288 """Error occurred, fail node."""
289 log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
290 self.stats.completedAction('ping', start)
291 del self.pinging[node.id]
292 self.nodeFailed(node)
295 def sendJoin(self, node, callback=None, errback=None):
296 """Join the DHT by pinging a bootstrap node.
298 @type node: L{node.Node}
299 @param node: the node to send the join to
300 @type callback: C{method}
301 @param callback: the method to call with the results, it must take 1
302 parameter, the contact info returned by the node
303 (optional, defaults to doing nothing with the results)
304 @type errback: C{method}
305 @param errback: the method to call if an error occurs
306 (optional, defaults to calling the callback with the error)
310 self.stats.startedAction('join')
311 df = node.join(self.node.id)
312 df.addCallbacks(self._joinHandler, self._joinError,
313 callbackArgs = (node, datetime.now()),
314 errbackArgs = (node, datetime.now()))
316 df.addCallbacks(callback, errback)
318 def _joinHandler(self, dict, node, start):
319 """Node responded properly, extract the response."""
320 self.stats.completedAction('join', start)
321 # Create the node using the returned contact info
322 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
323 reactor.callLater(0, self.insertNode, n)
324 return (dict['ip_addr'], dict['port'])
326 def _joinError(self, err, node, start):
327 """Error occurred, fail node."""
328 log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
329 self.stats.completedAction('join', start)
330 self.nodeFailed(node)
333 def findCloseNodes(self, callback=lambda a: None):
334 """Perform a findNode on the ID one away from our own.
336 This will allow us to populate our table with nodes on our network
337 closest to our own. This is called as soon as we start up with an
340 @type callback: C{method}
341 @param callback: the method to call with the results, it must take 1
342 parameter, the list of K closest nodes
343 (optional, defaults to doing nothing with the results)
345 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
346 self.findNode(id, callback)
348 def refreshTable(self, force = False):
349 """Check all the buckets for those that need refreshing.
351 @param force: refresh all buckets regardless of last bucket access time
352 (optional, defaults to False)
357 for bucket in self.table.buckets:
358 if force or (datetime.now() - bucket.lastAccessed >
359 timedelta(seconds=self.config['BUCKET_STALENESS'])):
360 # Choose a random ID in the bucket and try and find it
361 id = newIDInRange(bucket.min, bucket.max)
362 self.findNode(id, callback)
365 """Closes the port and cancels pending later calls."""
366 self.listenport.stopListening()
368 self.next_checkpoint.cancel()
371 for call in self.pinging:
372 if isinstance(call, DelayedCall) and call.active():
377 """Gather the statistics for the DHT."""
378 return self.stats.formatHTML()
381 def krpc_ping(self, id, _krpc_sender = None):
385 @param id: the node ID of the sender node
386 @type _krpc_sender: (C{string}, C{int})
387 @param _krpc_sender: the sender node's IP address and port
389 if _krpc_sender is not None:
390 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
391 reactor.callLater(0, self.insertNode, n, False)
393 return {"id" : self.node.id}
395 def krpc_join(self, id, _krpc_sender = None):
396 """Add the node by responding with its address and port.
399 @param id: the node ID of the sender node
400 @type _krpc_sender: (C{string}, C{int})
401 @param _krpc_sender: the sender node's IP address and port
403 if _krpc_sender is not None:
404 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
405 reactor.callLater(0, self.insertNode, n, False)
407 _krpc_sender = ('127.0.0.1', self.port)
409 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
411 def krpc_find_node(self, id, target, _krpc_sender = None):
412 """Find the K closest nodes to the target in the local routing table.
414 @type target: C{string}
415 @param target: the target ID to find nodes for
417 @param id: the node ID of the sender node
418 @type _krpc_sender: (C{string}, C{int})
419 @param _krpc_sender: the sender node's IP address and port
421 if _krpc_sender is not None:
422 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
423 reactor.callLater(0, self.insertNode, n, False)
425 _krpc_sender = ('127.0.0.1', self.port)
427 nodes = self.table.findNodes(target)
428 nodes = map(lambda node: node.contactInfo(), nodes)
429 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
430 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
433 class KhashmirRead(KhashmirBase):
434 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
439 def findValue(self, key, callback):
440 """Get the nodes that have values for the key from the global table.
443 @param key: the target key to find the values for
444 @type callback: C{method}
445 @param callback: the method to call with the results, it must take 1
446 parameter, the list of nodes with values
448 # Mark the bucket as having been accessed
449 self.table.touch(key)
452 nodes = [copy(self.node)]
454 # Search for others starting with the locally found ones
455 state = FindValue(self, key, callback, self.config, self.stats)
456 reactor.callLater(0, state.goWithNodes, nodes)
458 def valueForKey(self, key, callback, searchlocal = True):
459 """Get the values found for key in global table.
461 Callback will be called with a list of values for each peer that
462 returns unique values. The final callback will be an empty list.
465 @param key: the target key to get the values for
466 @type callback: C{method}
467 @param callback: the method to call with the results, it must take 2
468 parameters: the key, and the values found
469 @type searchlocal: C{boolean}
470 @param searchlocal: whether to also look for any local values
473 def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
474 """Use the found nodes to send requests for values to."""
475 # Get any local values
477 l = self.store.retrieveValues(key)
479 node = copy(self.node)
480 node.updateNumValues(len(l))
481 nodes = nodes + [node]
483 state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
484 reactor.callLater(0, state.goWithNodes, nodes)
486 # First lookup nodes that have values for the key
487 self.findValue(key, _getValueForKey)
490 def krpc_find_value(self, id, key, _krpc_sender = None):
491 """Find the number of values stored locally for the key, and the K closest nodes.
494 @param key: the target key to find the values and nodes for
496 @param id: the node ID of the sender node
497 @type _krpc_sender: (C{string}, C{int})
498 @param _krpc_sender: the sender node's IP address and port
500 if _krpc_sender is not None:
501 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
502 reactor.callLater(0, self.insertNode, n, False)
504 nodes = self.table.findNodes(key)
505 nodes = map(lambda node: node.contactInfo(), nodes)
506 num_values = self.store.countValues(key)
507 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
509 def krpc_get_value(self, id, key, num, _krpc_sender = None):
510 """Retrieve the values stored locally for the key.
513 @param key: the target key to retrieve the values for
515 @param num: the maximum number of values to retrieve, or 0 to
518 @param id: the node ID of the sender node
519 @type _krpc_sender: (C{string}, C{int})
520 @param _krpc_sender: the sender node's IP address and port
522 if _krpc_sender is not None:
523 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
524 reactor.callLater(0, self.insertNode, n, False)
526 l = self.store.retrieveValues(key)
527 if num == 0 or num >= len(l):
528 return {'values' : l, "id": self.node.id}
531 return {'values' : l[:num], "id": self.node.id}
534 class KhashmirWrite(KhashmirRead):
535 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
540 def storeValueForKey(self, key, value, callback=None):
541 """Stores the value for the key in the global table.
543 No status in this implementation, peers respond but don't indicate
544 status of storing values.
547 @param key: the target key to store the value for
548 @type value: C{string}
549 @param value: the value to store with the key
550 @type callback: C{method}
551 @param callback: the method to call with the results, it must take 3
552 parameters: the key, the value stored, and the result of the store
553 (optional, defaults to doing nothing with the results)
555 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
556 """Use the returned K closest nodes to store the key at."""
558 def _storedValueHandler(key, value, sender):
559 """Default callback that does nothing."""
561 response = _storedValueHandler
562 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
563 reactor.callLater(0, action.goWithNodes, nodes)
565 # First find the K closest nodes to operate on.
566 self.findNode(key, _storeValueForKey)
569 def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
570 """Store the value locally with the key.
573 @param key: the target key to store the value for
574 @type value: C{string}
575 @param value: the value to store with the key
576 @param token: the token to confirm that this peer contacted us previously
578 @param id: the node ID of the sender node
579 @type _krpc_sender: (C{string}, C{int})
580 @param _krpc_sender: the sender node's IP address and port
582 if _krpc_sender is not None:
583 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
584 reactor.callLater(0, self.insertNode, n, False)
586 _krpc_sender = ('127.0.0.1', self.port)
588 for secret in self.token_secrets:
589 this_token = sha(secret + _krpc_sender[0]).digest()
590 if token == this_token:
591 self.store.storeValue(key, value)
592 return {"id" : self.node.id}
593 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
596 class Khashmir(KhashmirWrite):
597 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
601 class SimpleTests(unittest.TestCase):
604 DHT_DEFAULTS = {'PORT': 9977,
605 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
606 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
607 'MAX_FAILURES': 3, 'LOCAL_OK': True,
608 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
609 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
610 'KEY_EXPIRE': 3600, 'SPEW': True, }
613 d = self.DHT_DEFAULTS.copy()
616 d = self.DHT_DEFAULTS.copy()
623 os.unlink(self.a.store.db)
624 os.unlink(self.b.store.db)
626 def testAddContact(self):
627 self.failUnlessEqual(len(self.a.table.buckets), 1)
628 self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
630 self.failUnlessEqual(len(self.b.table.buckets), 1)
631 self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
633 self.a.addContact('127.0.0.1', 4045)
643 self.failUnlessEqual(len(self.a.table.buckets), 1)
644 self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
645 self.failUnlessEqual(len(self.b.table.buckets), 1)
646 self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
648 def testStoreRetrieve(self):
649 self.a.addContact('127.0.0.1', 4045)
655 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
662 self.a.valueForKey(sha('foo').digest(), self._cb)
685 def _cb(self, key, val):
687 self.failUnlessEqual(self.got, 1)
688 elif 'foobar' in val:
692 class MultiTest(unittest.TestCase):
696 DHT_DEFAULTS = {'PORT': 9977,
697 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
698 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
699 'MAX_FAILURES': 3, 'LOCAL_OK': True,
700 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
701 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
702 'KEY_EXPIRE': 3600, 'SPEW': True, }
704 def _done(self, val):
709 self.startport = 4088
710 for i in range(self.num):
711 d = self.DHT_DEFAULTS.copy()
712 d['PORT'] = self.startport + i
713 self.l.append(Khashmir(d))
718 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
719 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
720 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
730 i.findCloseNodes(self._done)
735 i.findCloseNodes(self._done)
742 os.unlink(i.store.db)
746 def testStoreRetrieve(self):
753 def _scb(key, value, result):
755 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
763 self.failUnlessEqual(self.got, 1)
769 self.l[randrange(0, self.num)].valueForKey(K, _rcb)