2 """The main Khashmir program.
4 @var isLocal: a compiled regular expression suitable for testing if an
5 IP address is from a known local or private range
9 warnings.simplefilter("ignore", DeprecationWarning)
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
17 from twisted.internet.defer import Deferred
18 from twisted.internet.base import DelayedCall
19 from twisted.internet import protocol, reactor
20 from twisted.python import log
21 from twisted.trial import unittest
24 from ktable import KTable
25 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
26 from khash import newID, newIDInRange
27 from actions import FindNode, FindValue, GetValue, StoreValue
28 from stats import StatsLogger
31 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
32 '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
33 '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34 '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35 '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
36 '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
38 class KhashmirBase(protocol.Factory):
39 """The base Khashmir class, with base functionality and find node, no key-value mappings.
41 @type _Node: L{node.Node}
42 @ivar _Node: the knode implementation to use for this class of DHT
43 @type config: C{dictionary}
44 @ivar config: the configuration parameters for the DHT
45 @type pinging: C{dictionary}
46 @ivar pinging: the node's that are currently being pinged, keys are the
47 node id's, values are the Deferred or DelayedCall objects
49 @ivar port: the port to listen on
51 @ivar store: the database to store nodes and key/value pairs in
52 @type node: L{node.Node}
54 @type table: L{ktable.KTable}
55 @ivar table: the routing table
56 @type token_secrets: C{list} of C{string}
57 @ivar token_secrets: the current secrets to use to create tokens
58 @type stats: L{stats.StatsLogger}
59 @ivar stats: the statistics gatherer
60 @type udp: L{krpc.hostbroker}
61 @ivar udp: the factory for the KRPC protocol
62 @type listenport: L{twisted.internet.interfaces.IListeningPort}
63 @ivar listenport: the UDP listening port
64 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
65 @ivar next_checkpoint: the delayed call for the next checkpoint
70 def __init__(self, config, cache_dir='/tmp'):
71 """Initialize the Khashmir class and call the L{setup} method.
73 @type config: C{dictionary}
74 @param config: the configuration parameters for the DHT
75 @type cache_dir: C{string}
76 @param cache_dir: the directory to store all files in
77 (optional, defaults to the /tmp directory)
81 self.setup(config, cache_dir)
83 def setup(self, config, cache_dir):
84 """Setup all the Khashmir sub-modules.
86 @type config: C{dictionary}
87 @param config: the configuration parameters for the DHT
88 @type cache_dir: C{string}
89 @param cache_dir: the directory to store all files in
92 self.port = config['PORT']
93 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
94 self.node = self._loadSelfNode('', self.port)
95 self.table = KTable(self.node, config)
96 self.token_secrets = [newID()]
97 self.stats = StatsLogger(self.table, self.store)
100 self.udp = krpc.hostbroker(self, self.stats, config)
101 self.udp.protocol = krpc.KRPC
102 self.listenport = reactor.listenUDP(self.port, self.udp)
104 # Load the routing table and begin checkpointing
105 self._loadRoutingTable()
106 self.refreshTable(force = True)
107 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
109 def Node(self, id, host = None, port = None):
110 """Create a new node.
112 @see: L{node.Node.__init__}
114 n = self._Node(id, host, port)
116 n.conn = self.udp.connectionForAddr((n.host, n.port))
120 """Stop listening for packets."""
121 self.listenport.stopListening()
123 def _loadSelfNode(self, host, port):
124 """Create this node, loading any previously saved one."""
125 id = self.store.getSelfNode()
126 if not id or not id.endswith(self.config['VERSION']):
127 id = newID(self.config['VERSION'])
128 return self._Node(id, host, port)
130 def checkpoint(self):
131 """Perform some periodic maintenance operations."""
132 # Create a new token secret
133 self.token_secrets.insert(0, newID())
134 if len(self.token_secrets) > 3:
135 self.token_secrets.pop()
137 # Save some parameters for reloading
138 self.store.saveSelfNode(self.node.id)
139 self.store.dumpRoutingTable(self.table.buckets)
142 self.store.expireValues(self.config['KEY_EXPIRE'])
145 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
146 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
149 def _loadRoutingTable(self):
150 """Load the previous routing table nodes from the database.
152 It's usually a good idea to call refreshTable(force = True) after
155 nodes = self.store.getRoutingTable()
157 n = self.Node(rec[0], rec[1], int(rec[2]))
158 self.table.insertNode(n, contacted = False)
161 def addContact(self, host, port, callback=None, errback=None):
162 """Ping this node and add the contact info to the table on pong.
164 @type host: C{string}
165 @param host: the IP address of the node to contact
167 @param port:the port of the node to contact
168 @type callback: C{method}
169 @param callback: the method to call with the results, it must take 1
170 parameter, the contact info returned by the node
171 (optional, defaults to doing nothing with the results)
172 @type errback: C{method}
173 @param errback: the method to call if an error occurs
174 (optional, defaults to calling the callback with the error)
176 n = self.Node(NULL_ID, host, port)
177 self.sendJoin(n, callback=callback, errback=errback)
179 def findNode(self, id, callback):
180 """Find the contact info for the K closest nodes in the global table.
183 @param id: the target ID to find the K closest nodes of
184 @type callback: C{method}
185 @param callback: the method to call with the results, it must take 1
186 parameter, the list of K closest nodes
188 # Mark the bucket as having been accessed
191 # Start with our node
192 nodes = [copy(self.node)]
194 # Start the finding nodes action
195 state = FindNode(self, id, callback, self.config, self.stats)
196 reactor.callLater(0, state.goWithNodes, nodes)
198 def insertNode(self, node, contacted = True):
199 """Try to insert a node in our local table, pinging oldest contact if necessary.
201 If all you have is a host/port, then use L{addContact}, which calls this
202 method after receiving the PONG from the remote node. The reason for
203 the separation is we can't insert a node into the table without its
204 node ID. That means of course the node passed into this method needs
205 to be a properly formed Node object with a valid ID.
207 @type node: L{node.Node}
208 @param node: the new node to try and insert
209 @type contacted: C{boolean}
210 @param contacted: whether the new node is known to be good, i.e.
211 responded to a request (optional, defaults to True)
213 # Don't add any local nodes to the routing table
214 if not self.config['LOCAL_OK'] and isLocal.match(node.host):
215 log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
218 old = self.table.insertNode(node, contacted=contacted)
220 if (isinstance(old, self._Node) and old.id != self.node.id and
221 (datetime.now() - old.lastSeen) >
222 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
224 # Bucket is full, check to see if old node is still available
225 df = self.sendPing(old)
226 df.addErrback(self._staleNodeHandler, old, node, contacted)
227 elif not old and not contacted:
228 # There's room, we just need to contact the node first
229 df = self.sendPing(node)
230 # Also schedule a future ping to make sure the node works
231 def rePing(newnode, self = self):
232 if newnode.id not in self.pinging:
233 self.pinging[newnode.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
234 self.sendPing, newnode)
236 df.addCallback(rePing)
238 def _staleNodeHandler(self, err, old, node, contacted):
239 """The pinged node never responded, so replace it."""
240 self.table.invalidateNode(old)
241 self.insertNode(node, contacted)
244 def nodeFailed(self, node):
245 """Mark a node as having failed a request and schedule a future check.
247 @type node: L{node.Node}
248 @param node: the new node to try and insert
250 exists = self.table.nodeFailed(node)
252 # If in the table, schedule a ping, if one isn't already sent/scheduled
253 if exists and node.id not in self.pinging:
254 self.pinging[node.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
257 def sendPing(self, node):
258 """Ping the node to see if it's still alive.
260 @type node: L{node.Node}
261 @param node: the node to send the join to
263 # Check for a ping already underway
264 if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
265 self.pinging[node.id].active()):
266 self.pinging[node.id].cancel()
267 elif isinstance(self.pinging.get(node.id, None), Deferred):
268 return self.pinging[node.id]
270 self.stats.startedAction('ping')
271 df = node.ping(self.node.id)
272 self.pinging[node.id] = df
273 df.addCallbacks(self._pingHandler, self._pingError,
274 callbackArgs = (node, datetime.now()),
275 errbackArgs = (node, datetime.now()))
278 def _pingHandler(self, dict, node, start):
279 """Node responded properly, update it and return the node object."""
280 self.stats.completedAction('ping', start)
281 del self.pinging[node.id]
282 # Create the node using the returned contact info
283 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
284 reactor.callLater(0, self.insertNode, n)
287 def _pingError(self, err, node, start):
288 """Error occurred, fail node."""
289 log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
290 self.stats.completedAction('ping', start)
291 del self.pinging[node.id]
292 self.nodeFailed(node)
295 def sendJoin(self, node, callback=None, errback=None):
296 """Join the DHT by pinging a bootstrap node.
298 @type node: L{node.Node}
299 @param node: the node to send the join to
300 @type callback: C{method}
301 @param callback: the method to call with the results, it must take 1
302 parameter, the contact info returned by the node
303 (optional, defaults to doing nothing with the results)
304 @type errback: C{method}
305 @param errback: the method to call if an error occurs
306 (optional, defaults to calling the callback with the error)
310 self.stats.startedAction('join')
311 df = node.join(self.node.id)
312 df.addCallbacks(self._joinHandler, self._joinError,
313 callbackArgs = (node, datetime.now()),
314 errbackArgs = (node, datetime.now()))
316 df.addCallbacks(callback, errback)
318 def _joinHandler(self, dict, node, start):
319 """Node responded properly, extract the response."""
320 self.stats.completedAction('join', start)
321 # Create the node using the returned contact info
322 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
323 reactor.callLater(0, self.insertNode, n)
324 return (dict['ip_addr'], dict['port'])
326 def _joinError(self, err, node, start):
327 """Error occurred, fail node."""
328 log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
329 self.stats.completedAction('join', start)
330 self.nodeFailed(node)
333 def findCloseNodes(self, callback=lambda a: None):
334 """Perform a findNode on the ID one away from our own.
336 This will allow us to populate our table with nodes on our network
337 closest to our own. This is called as soon as we start up with an
340 @type callback: C{method}
341 @param callback: the method to call with the results, it must take 1
342 parameter, the list of K closest nodes
343 (optional, defaults to doing nothing with the results)
345 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
346 self.findNode(id, callback)
348 def refreshTable(self, force = False):
349 """Check all the buckets for those that need refreshing.
351 @param force: refresh all buckets regardless of last bucket access time
352 (optional, defaults to False)
357 for bucket in self.table.buckets:
358 if force or (datetime.now() - bucket.lastAccessed >
359 timedelta(seconds=self.config['BUCKET_STALENESS'])):
360 # Choose a random ID in the bucket and try and find it
361 id = newIDInRange(bucket.min, bucket.max)
362 self.findNode(id, callback)
365 """Closes the port and cancels pending later calls."""
366 self.listenport.stopListening()
368 self.next_checkpoint.cancel()
371 for nodeid in self.pinging.keys():
372 if isinstance(self.pinging[nodeid], DelayedCall) and self.pinging[nodeid].active():
373 self.pinging[nodeid].cancel()
374 del self.pinging[nodeid]
378 """Gather the statistics for the DHT."""
379 return self.stats.formatHTML()
382 def krpc_ping(self, id, _krpc_sender = None):
386 @param id: the node ID of the sender node
387 @type _krpc_sender: (C{string}, C{int})
388 @param _krpc_sender: the sender node's IP address and port
390 if _krpc_sender is not None:
391 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
392 reactor.callLater(0, self.insertNode, n, False)
394 return {"id" : self.node.id}
396 def krpc_join(self, id, _krpc_sender = None):
397 """Add the node by responding with its address and port.
400 @param id: the node ID of the sender node
401 @type _krpc_sender: (C{string}, C{int})
402 @param _krpc_sender: the sender node's IP address and port
404 if _krpc_sender is not None:
405 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
406 reactor.callLater(0, self.insertNode, n, False)
408 _krpc_sender = ('127.0.0.1', self.port)
410 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
412 def krpc_find_node(self, id, target, _krpc_sender = None):
413 """Find the K closest nodes to the target in the local routing table.
415 @type target: C{string}
416 @param target: the target ID to find nodes for
418 @param id: the node ID of the sender node
419 @type _krpc_sender: (C{string}, C{int})
420 @param _krpc_sender: the sender node's IP address and port
422 if _krpc_sender is not None:
423 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
424 reactor.callLater(0, self.insertNode, n, False)
426 _krpc_sender = ('127.0.0.1', self.port)
428 nodes = self.table.findNodes(target)
429 nodes = map(lambda node: node.contactInfo(), nodes)
430 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
431 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
434 class KhashmirRead(KhashmirBase):
435 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
440 def findValue(self, key, callback):
441 """Get the nodes that have values for the key from the global table.
444 @param key: the target key to find the values for
445 @type callback: C{method}
446 @param callback: the method to call with the results, it must take 1
447 parameter, the list of nodes with values
449 # Mark the bucket as having been accessed
450 self.table.touch(key)
453 nodes = [copy(self.node)]
455 # Search for others starting with the locally found ones
456 state = FindValue(self, key, callback, self.config, self.stats)
457 reactor.callLater(0, state.goWithNodes, nodes)
459 def valueForKey(self, key, callback, searchlocal = True):
460 """Get the values found for key in global table.
462 Callback will be called with a list of values for each peer that
463 returns unique values. The final callback will be an empty list.
466 @param key: the target key to get the values for
467 @type callback: C{method}
468 @param callback: the method to call with the results, it must take 2
469 parameters: the key, and the values found
470 @type searchlocal: C{boolean}
471 @param searchlocal: whether to also look for any local values
474 def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
475 """Use the found nodes to send requests for values to."""
476 # Get any local values
478 l = self.store.retrieveValues(key)
480 node = copy(self.node)
481 node.updateNumValues(len(l))
482 nodes = nodes + [node]
484 state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
485 reactor.callLater(0, state.goWithNodes, nodes)
487 # First lookup nodes that have values for the key
488 self.findValue(key, _getValueForKey)
491 def krpc_find_value(self, id, key, _krpc_sender = None):
492 """Find the number of values stored locally for the key, and the K closest nodes.
495 @param key: the target key to find the values and nodes for
497 @param id: the node ID of the sender node
498 @type _krpc_sender: (C{string}, C{int})
499 @param _krpc_sender: the sender node's IP address and port
501 if _krpc_sender is not None:
502 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
503 reactor.callLater(0, self.insertNode, n, False)
505 nodes = self.table.findNodes(key)
506 nodes = map(lambda node: node.contactInfo(), nodes)
507 num_values = self.store.countValues(key)
508 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
510 def krpc_get_value(self, id, key, num, _krpc_sender = None):
511 """Retrieve the values stored locally for the key.
514 @param key: the target key to retrieve the values for
516 @param num: the maximum number of values to retrieve, or 0 to
519 @param id: the node ID of the sender node
520 @type _krpc_sender: (C{string}, C{int})
521 @param _krpc_sender: the sender node's IP address and port
523 if _krpc_sender is not None:
524 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
525 reactor.callLater(0, self.insertNode, n, False)
527 l = self.store.retrieveValues(key)
528 if num == 0 or num >= len(l):
529 return {'values' : l, "id": self.node.id}
532 return {'values' : l[:num], "id": self.node.id}
535 class KhashmirWrite(KhashmirRead):
536 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
541 def storeValueForKey(self, key, value, callback=None):
542 """Stores the value for the key in the global table.
544 No status in this implementation, peers respond but don't indicate
545 status of storing values.
548 @param key: the target key to store the value for
549 @type value: C{string}
550 @param value: the value to store with the key
551 @type callback: C{method}
552 @param callback: the method to call with the results, it must take 3
553 parameters: the key, the value stored, and the result of the store
554 (optional, defaults to doing nothing with the results)
556 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
557 """Use the returned K closest nodes to store the key at."""
559 def _storedValueHandler(key, value, sender):
560 """Default callback that does nothing."""
562 response = _storedValueHandler
563 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
564 reactor.callLater(0, action.goWithNodes, nodes)
566 # First find the K closest nodes to operate on.
567 self.findNode(key, _storeValueForKey)
570 def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
571 """Store the value locally with the key.
574 @param key: the target key to store the value for
575 @type value: C{string}
576 @param value: the value to store with the key
577 @param token: the token to confirm that this peer contacted us previously
579 @param id: the node ID of the sender node
580 @type _krpc_sender: (C{string}, C{int})
581 @param _krpc_sender: the sender node's IP address and port
583 if _krpc_sender is not None:
584 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
585 reactor.callLater(0, self.insertNode, n, False)
587 _krpc_sender = ('127.0.0.1', self.port)
589 for secret in self.token_secrets:
590 this_token = sha(secret + _krpc_sender[0]).digest()
591 if token == this_token:
592 self.store.storeValue(key, value)
593 return {"id" : self.node.id}
594 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
597 class Khashmir(KhashmirWrite):
598 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
602 class SimpleTests(unittest.TestCase):
605 DHT_DEFAULTS = {'VERSION': 'A000', 'PORT': 9977,
606 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
607 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
608 'MAX_FAILURES': 3, 'LOCAL_OK': True,
609 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
610 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
611 'KEY_EXPIRE': 3600, 'SPEW': True, }
614 d = self.DHT_DEFAULTS.copy()
617 d = self.DHT_DEFAULTS.copy()
624 os.unlink(self.a.store.db)
625 os.unlink(self.b.store.db)
627 def testAddContact(self):
628 self.failUnlessEqual(len(self.a.table.buckets), 1)
629 self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
631 self.failUnlessEqual(len(self.b.table.buckets), 1)
632 self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
634 self.a.addContact('127.0.0.1', 4045)
644 self.failUnlessEqual(len(self.a.table.buckets), 1)
645 self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
646 self.failUnlessEqual(len(self.b.table.buckets), 1)
647 self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
649 def testStoreRetrieve(self):
650 self.a.addContact('127.0.0.1', 4045)
656 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
663 self.a.valueForKey(sha('foo').digest(), self._cb)
686 def _cb(self, key, val):
688 self.failUnlessEqual(self.got, 1)
689 elif 'foobar' in val:
693 class MultiTest(unittest.TestCase):
697 DHT_DEFAULTS = {'VERSION': 'A000', 'PORT': 9977,
698 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
699 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
700 'MAX_FAILURES': 3, 'LOCAL_OK': True,
701 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
702 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
703 'KEY_EXPIRE': 3600, 'SPEW': True, }
705 def _done(self, val):
710 self.startport = 4088
711 for i in range(self.num):
712 d = self.DHT_DEFAULTS.copy()
713 d['PORT'] = self.startport + i
714 self.l.append(Khashmir(d))
719 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
720 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
721 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
731 i.findCloseNodes(self._done)
736 i.findCloseNodes(self._done)
743 os.unlink(i.store.db)
747 def testStoreRetrieve(self):
754 def _scb(key, value, result):
756 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
764 self.failUnlessEqual(self.got, 1)
770 self.l[randrange(0, self.num)].valueForKey(K, _rcb)