2 """The main Khashmir program.
4 @var isLocal: a compiled regular expression suitable for testing if an
5 IP address is from a known local or private range
9 warnings.simplefilter("ignore", DeprecationWarning)
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
17 from twisted.internet.defer import Deferred
18 from twisted.internet.base import DelayedCall
19 from twisted.internet import protocol, reactor
20 from twisted.python import log
21 from twisted.trial import unittest
24 from ktable import KTable
25 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
26 from khash import newID, newIDInRange
27 from actions import FindNode, FindValue, GetValue, StoreValue
28 from stats import StatsLogger
31 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
32 '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
33 '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34 '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35 '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
36 '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
38 class KhashmirBase(protocol.Factory):
39 """The base Khashmir class, with base functionality and find node, no key-value mappings.
41 @type _Node: L{node.Node}
42 @ivar _Node: the knode implementation to use for this class of DHT
43 @type config: C{dictionary}
44 @ivar config: the configuration parameters for the DHT
45 @type pinging: C{dictionary}
46 @ivar pinging: the node's that are currently being pinged, keys are the
47 node id's, values are the Deferred or DelayedCall objects
49 @ivar port: the port to listen on
51 @ivar store: the database to store nodes and key/value pairs in
52 @type node: L{node.Node}
54 @type table: L{ktable.KTable}
55 @ivar table: the routing table
56 @type token_secrets: C{list} of C{string}
57 @ivar token_secrets: the current secrets to use to create tokens
58 @type stats: L{stats.StatsLogger}
59 @ivar stats: the statistics gatherer
60 @type udp: L{krpc.hostbroker}
61 @ivar udp: the factory for the KRPC protocol
62 @type listenport: L{twisted.internet.interfaces.IListeningPort}
63 @ivar listenport: the UDP listening port
64 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
65 @ivar next_checkpoint: the delayed call for the next checkpoint
70 def __init__(self, config, cache_dir='/tmp'):
71 """Initialize the Khashmir class and call the L{setup} method.
73 @type config: C{dictionary}
74 @param config: the configuration parameters for the DHT
75 @type cache_dir: C{string}
76 @param cache_dir: the directory to store all files in
77 (optional, defaults to the /tmp directory)
81 self.setup(config, cache_dir)
83 def setup(self, config, cache_dir):
84 """Setup all the Khashmir sub-modules.
86 @type config: C{dictionary}
87 @param config: the configuration parameters for the DHT
88 @type cache_dir: C{string}
89 @param cache_dir: the directory to store all files in
92 self.port = config['PORT']
93 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
94 self.node = self._loadSelfNode('', self.port)
95 self.table = KTable(self.node, config)
96 self.token_secrets = [newID()]
97 self.stats = StatsLogger(self.table, self.store)
100 self.udp = krpc.hostbroker(self, self.stats, config)
101 self.udp.protocol = krpc.KRPC
102 self.listenport = reactor.listenUDP(self.port, self.udp)
104 # Load the routing table and begin checkpointing
105 self._loadRoutingTable()
106 self.refreshTable(force = True)
107 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
109 def Node(self, id, host = None, port = None):
110 """Create a new node.
112 @see: L{node.Node.__init__}
114 n = self._Node(id, host, port)
116 n.conn = self.udp.connectionForAddr((n.host, n.port))
120 """Stop listening for packets."""
121 self.listenport.stopListening()
123 def _loadSelfNode(self, host, port):
124 """Create this node, loading any previously saved one."""
125 id = self.store.getSelfNode()
126 if not id or not id.endswith(self.config['VERSION']):
127 id = newID(self.config['VERSION'])
128 return self._Node(id, host, port)
130 def checkpoint(self):
131 """Perform some periodic maintenance operations."""
132 # Create a new token secret
133 self.token_secrets.insert(0, newID())
134 if len(self.token_secrets) > 3:
135 self.token_secrets.pop()
137 # Save some parameters for reloading
138 self.store.saveSelfNode(self.node.id)
139 self.store.dumpRoutingTable(self.table.buckets)
142 self.store.expireValues(self.config['KEY_EXPIRE'])
145 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
146 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
149 def _loadRoutingTable(self):
150 """Load the previous routing table nodes from the database.
152 It's usually a good idea to call refreshTable(force = True) after
155 nodes = self.store.getRoutingTable()
157 n = self.Node(rec[0], rec[1], int(rec[2]))
158 self.table.insertNode(n, contacted = False)
161 def addContact(self, host, port, callback=None, errback=None):
162 """Ping this node and add the contact info to the table on pong.
164 @type host: C{string}
165 @param host: the IP address of the node to contact
167 @param port:the port of the node to contact
168 @type callback: C{method}
169 @param callback: the method to call with the results, it must take 1
170 parameter, the contact info returned by the node
171 (optional, defaults to doing nothing with the results)
172 @type errback: C{method}
173 @param errback: the method to call if an error occurs
174 (optional, defaults to calling the callback with the error)
176 n = self.Node(NULL_ID, host, port)
177 self.sendJoin(n, callback=callback, errback=errback)
179 def findNode(self, id, callback):
180 """Find the contact info for the K closest nodes in the global table.
183 @param id: the target ID to find the K closest nodes of
184 @type callback: C{method}
185 @param callback: the method to call with the results, it must take 1
186 parameter, the list of K closest nodes
188 # Mark the bucket as having been accessed
191 # Start with our node
192 nodes = [copy(self.node)]
194 # Start the finding nodes action
195 state = FindNode(self, id, callback, self.config, self.stats)
196 reactor.callLater(0, state.goWithNodes, nodes)
198 def insertNode(self, node, contacted = True):
199 """Try to insert a node in our local table, pinging oldest contact if necessary.
201 If all you have is a host/port, then use L{addContact}, which calls this
202 method after receiving the PONG from the remote node. The reason for
203 the separation is we can't insert a node into the table without its
204 node ID. That means of course the node passed into this method needs
205 to be a properly formed Node object with a valid ID.
207 @type node: L{node.Node}
208 @param node: the new node to try and insert
209 @type contacted: C{boolean}
210 @param contacted: whether the new node is known to be good, i.e.
211 responded to a request (optional, defaults to True)
213 # Don't add any local nodes to the routing table
214 if not self.config['LOCAL_OK'] and isLocal.match(node.host):
215 log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
218 old = self.table.insertNode(node, contacted=contacted)
220 if (isinstance(old, self._Node) and old.id != self.node.id and
221 (datetime.now() - old.lastSeen) >
222 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
224 # Bucket is full, check to see if old node is still available
225 df = self.sendPing(old)
226 df.addErrback(self._staleNodeHandler, old, node, contacted)
227 elif not old and not contacted:
228 # There's room, we just need to contact the node first
229 df = self.sendPing(node)
230 # Also schedule a future ping to make sure the node works
231 def rePing(newnode, self = self):
232 if newnode.id not in self.pinging:
233 self.pinging[newnode.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
234 self.sendPing, newnode)
236 df.addCallback(rePing)
238 def _staleNodeHandler(self, err, old, node, contacted):
239 """The pinged node never responded, so replace it."""
240 self.table.invalidateNode(old)
241 self.insertNode(node, contacted)
244 def nodeFailed(self, node):
245 """Mark a node as having failed a request and schedule a future check.
247 @type node: L{node.Node}
248 @param node: the new node to try and insert
250 exists = self.table.nodeFailed(node)
252 # If in the table, schedule a ping, if one isn't already sent/scheduled
253 if exists and node.id not in self.pinging:
254 self.pinging[node.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
257 def sendPing(self, node):
258 """Ping the node to see if it's still alive.
260 @type node: L{node.Node}
261 @param node: the node to send the join to
263 # Check for a ping already underway
264 if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
265 self.pinging[node.id].active()):
266 self.pinging[node.id].cancel()
267 elif isinstance(self.pinging.get(node.id, None), Deferred):
268 return self.pinging[node.id]
270 self.stats.startedAction('ping')
271 df = node.ping(self.node.id)
272 self.pinging[node.id] = df
273 df.addCallbacks(self._pingHandler, self._pingError,
274 callbackArgs = (node, datetime.now()),
275 errbackArgs = (node, datetime.now()))
278 def _pingHandler(self, dict, node, start):
279 """Node responded properly, update it and return the node object."""
280 self.stats.completedAction('ping', start)
281 del self.pinging[node.id]
282 # Create the node using the returned contact info
283 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
284 reactor.callLater(0, self.insertNode, n)
287 def _pingError(self, err, node, start):
288 """Error occurred, fail node."""
289 log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
290 self.stats.completedAction('ping', start)
292 # Consume unhandled errors
293 self.pinging[node.id].addErrback(lambda ping_err: None)
294 del self.pinging[node.id]
296 self.nodeFailed(node)
299 def sendJoin(self, node, callback=None, errback=None):
300 """Join the DHT by pinging a bootstrap node.
302 @type node: L{node.Node}
303 @param node: the node to send the join to
304 @type callback: C{method}
305 @param callback: the method to call with the results, it must take 1
306 parameter, the contact info returned by the node
307 (optional, defaults to doing nothing with the results)
308 @type errback: C{method}
309 @param errback: the method to call if an error occurs
310 (optional, defaults to calling the callback with the error)
314 self.stats.startedAction('join')
315 df = node.join(self.node.id)
316 df.addCallbacks(self._joinHandler, self._joinError,
317 callbackArgs = (node, datetime.now()),
318 errbackArgs = (node, datetime.now()))
320 df.addCallbacks(callback, errback)
322 def _joinHandler(self, dict, node, start):
323 """Node responded properly, extract the response."""
324 self.stats.completedAction('join', start)
325 # Create the node using the returned contact info
326 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
327 reactor.callLater(0, self.insertNode, n)
328 return (dict['ip_addr'], dict['port'])
330 def _joinError(self, err, node, start):
331 """Error occurred, fail node."""
332 log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
333 self.stats.completedAction('join', start)
334 self.nodeFailed(node)
337 def findCloseNodes(self, callback=lambda a: None):
338 """Perform a findNode on the ID one away from our own.
340 This will allow us to populate our table with nodes on our network
341 closest to our own. This is called as soon as we start up with an
344 @type callback: C{method}
345 @param callback: the method to call with the results, it must take 1
346 parameter, the list of K closest nodes
347 (optional, defaults to doing nothing with the results)
349 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
350 self.findNode(id, callback)
352 def refreshTable(self, force = False):
353 """Check all the buckets for those that need refreshing.
355 @param force: refresh all buckets regardless of last bucket access time
356 (optional, defaults to False)
361 for bucket in self.table.buckets:
362 if force or (datetime.now() - bucket.lastAccessed >
363 timedelta(seconds=self.config['BUCKET_STALENESS'])):
364 # Choose a random ID in the bucket and try and find it
365 id = newIDInRange(bucket.min, bucket.max)
366 self.findNode(id, callback)
369 """Closes the port and cancels pending later calls."""
370 self.listenport.stopListening()
372 self.next_checkpoint.cancel()
375 for nodeid in self.pinging.keys():
376 if isinstance(self.pinging[nodeid], DelayedCall) and self.pinging[nodeid].active():
377 self.pinging[nodeid].cancel()
378 del self.pinging[nodeid]
382 """Gather the statistics for the DHT."""
383 return self.stats.formatHTML()
386 def krpc_ping(self, id, _krpc_sender = None):
390 @param id: the node ID of the sender node
391 @type _krpc_sender: (C{string}, C{int})
392 @param _krpc_sender: the sender node's IP address and port
394 if _krpc_sender is not None:
395 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
396 reactor.callLater(0, self.insertNode, n, False)
398 return {"id" : self.node.id}
400 def krpc_join(self, id, _krpc_sender = None):
401 """Add the node by responding with its address and port.
404 @param id: the node ID of the sender node
405 @type _krpc_sender: (C{string}, C{int})
406 @param _krpc_sender: the sender node's IP address and port
408 if _krpc_sender is not None:
409 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
410 reactor.callLater(0, self.insertNode, n, False)
412 _krpc_sender = ('127.0.0.1', self.port)
414 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
416 def krpc_find_node(self, id, target, _krpc_sender = None):
417 """Find the K closest nodes to the target in the local routing table.
419 @type target: C{string}
420 @param target: the target ID to find nodes for
422 @param id: the node ID of the sender node
423 @type _krpc_sender: (C{string}, C{int})
424 @param _krpc_sender: the sender node's IP address and port
426 if _krpc_sender is not None:
427 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
428 reactor.callLater(0, self.insertNode, n, False)
430 _krpc_sender = ('127.0.0.1', self.port)
432 nodes = self.table.findNodes(target)
433 nodes = map(lambda node: node.contactInfo(), nodes)
434 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
435 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
438 class KhashmirRead(KhashmirBase):
439 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
444 def findValue(self, key, callback):
445 """Get the nodes that have values for the key from the global table.
448 @param key: the target key to find the values for
449 @type callback: C{method}
450 @param callback: the method to call with the results, it must take 1
451 parameter, the list of nodes with values
453 # Mark the bucket as having been accessed
454 self.table.touch(key)
457 nodes = [copy(self.node)]
459 # Search for others starting with the locally found ones
460 state = FindValue(self, key, callback, self.config, self.stats)
461 reactor.callLater(0, state.goWithNodes, nodes)
463 def valueForKey(self, key, callback, searchlocal = True):
464 """Get the values found for key in global table.
466 Callback will be called with a list of values for each peer that
467 returns unique values. The final callback will be an empty list.
470 @param key: the target key to get the values for
471 @type callback: C{method}
472 @param callback: the method to call with the results, it must take 2
473 parameters: the key, and the values found
474 @type searchlocal: C{boolean}
475 @param searchlocal: whether to also look for any local values
478 def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
479 """Use the found nodes to send requests for values to."""
480 # Get any local values
482 l = self.store.retrieveValues(key)
484 node = copy(self.node)
485 node.updateNumValues(len(l))
486 nodes = nodes + [node]
488 state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
489 reactor.callLater(0, state.goWithNodes, nodes)
491 # First lookup nodes that have values for the key
492 self.findValue(key, _getValueForKey)
495 def krpc_find_value(self, id, key, _krpc_sender = None):
496 """Find the number of values stored locally for the key, and the K closest nodes.
499 @param key: the target key to find the values and nodes for
501 @param id: the node ID of the sender node
502 @type _krpc_sender: (C{string}, C{int})
503 @param _krpc_sender: the sender node's IP address and port
505 if _krpc_sender is not None:
506 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
507 reactor.callLater(0, self.insertNode, n, False)
509 nodes = self.table.findNodes(key)
510 nodes = map(lambda node: node.contactInfo(), nodes)
511 num_values = self.store.countValues(key)
512 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
514 def krpc_get_value(self, id, key, num, _krpc_sender = None):
515 """Retrieve the values stored locally for the key.
518 @param key: the target key to retrieve the values for
520 @param num: the maximum number of values to retrieve, or 0 to
523 @param id: the node ID of the sender node
524 @type _krpc_sender: (C{string}, C{int})
525 @param _krpc_sender: the sender node's IP address and port
527 if _krpc_sender is not None:
528 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
529 reactor.callLater(0, self.insertNode, n, False)
531 l = self.store.retrieveValues(key)
532 if num == 0 or num >= len(l):
533 return {'values' : l, "id": self.node.id}
536 return {'values' : l[:num], "id": self.node.id}
539 class KhashmirWrite(KhashmirRead):
540 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
545 def storeValueForKey(self, key, value, callback=None):
546 """Stores the value for the key in the global table.
548 No status in this implementation, peers respond but don't indicate
549 status of storing values.
552 @param key: the target key to store the value for
553 @type value: C{string}
554 @param value: the value to store with the key
555 @type callback: C{method}
556 @param callback: the method to call with the results, it must take 3
557 parameters: the key, the value stored, and the result of the store
558 (optional, defaults to doing nothing with the results)
560 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
561 """Use the returned K closest nodes to store the key at."""
563 def _storedValueHandler(key, value, sender):
564 """Default callback that does nothing."""
566 response = _storedValueHandler
567 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
568 reactor.callLater(0, action.goWithNodes, nodes)
570 # First find the K closest nodes to operate on.
571 self.findNode(key, _storeValueForKey)
574 def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
575 """Store the value locally with the key.
578 @param key: the target key to store the value for
579 @type value: C{string}
580 @param value: the value to store with the key
581 @param token: the token to confirm that this peer contacted us previously
583 @param id: the node ID of the sender node
584 @type _krpc_sender: (C{string}, C{int})
585 @param _krpc_sender: the sender node's IP address and port
587 if _krpc_sender is not None:
588 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
589 reactor.callLater(0, self.insertNode, n, False)
591 _krpc_sender = ('127.0.0.1', self.port)
593 for secret in self.token_secrets:
594 this_token = sha(secret + _krpc_sender[0]).digest()
595 if token == this_token:
596 self.store.storeValue(key, value)
597 return {"id" : self.node.id}
598 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
601 class Khashmir(KhashmirWrite):
602 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
606 class SimpleTests(unittest.TestCase):
609 DHT_DEFAULTS = {'VERSION': 'A000', 'PORT': 9977,
610 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
611 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
612 'MAX_FAILURES': 3, 'LOCAL_OK': True,
613 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
614 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
615 'KEY_EXPIRE': 3600, 'SPEW': True, }
618 d = self.DHT_DEFAULTS.copy()
621 d = self.DHT_DEFAULTS.copy()
628 os.unlink(self.a.store.db)
629 os.unlink(self.b.store.db)
631 def testAddContact(self):
632 self.failUnlessEqual(len(self.a.table.buckets), 1)
633 self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
635 self.failUnlessEqual(len(self.b.table.buckets), 1)
636 self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
638 self.a.addContact('127.0.0.1', 4045)
648 self.failUnlessEqual(len(self.a.table.buckets), 1)
649 self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
650 self.failUnlessEqual(len(self.b.table.buckets), 1)
651 self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
653 def testStoreRetrieve(self):
654 self.a.addContact('127.0.0.1', 4045)
660 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
667 self.a.valueForKey(sha('foo').digest(), self._cb)
690 def _cb(self, key, val):
692 self.failUnlessEqual(self.got, 1)
693 elif 'foobar' in val:
697 class MultiTest(unittest.TestCase):
701 DHT_DEFAULTS = {'VERSION': 'A000', 'PORT': 9977,
702 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
703 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
704 'MAX_FAILURES': 3, 'LOCAL_OK': True,
705 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
706 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
707 'KEY_EXPIRE': 3600, 'SPEW': True, }
709 def _done(self, val):
714 self.startport = 4088
715 for i in range(self.num):
716 d = self.DHT_DEFAULTS.copy()
717 d['PORT'] = self.startport + i
718 self.l.append(Khashmir(d))
723 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
724 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
725 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
735 i.findCloseNodes(self._done)
740 i.findCloseNodes(self._done)
747 os.unlink(i.store.db)
751 def testStoreRetrieve(self):
758 def _scb(key, value, result):
760 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
768 self.failUnlessEqual(self.got, 1)
774 self.l[randrange(0, self.num)].valueForKey(K, _rcb)