2 """The main Khashmir program.
4 @var isLocal: a compiled regular expression suitable for testing if an
5 IP address is from a known local or private range
9 warnings.simplefilter("ignore", DeprecationWarning)
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
17 from twisted.internet.defer import Deferred
18 from twisted.internet import protocol, reactor
19 from twisted.python import log
20 from twisted.trial import unittest
23 from ktable import KTable
24 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
25 from khash import newID, newIDInRange
26 from actions import FindNode, FindValue, GetValue, StoreValue
27 from stats import StatsLogger
30 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
31 '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
32 '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
33 '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34 '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35 '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
37 class KhashmirBase(protocol.Factory):
38 """The base Khashmir class, with base functionality and find node, no key-value mappings.
40 @type _Node: L{node.Node}
41 @ivar _Node: the knode implementation to use for this class of DHT
42 @type config: C{dictionary}
43 @ivar config: the configuration parameters for the DHT
45 @ivar port: the port to listen on
47 @ivar store: the database to store nodes and key/value pairs in
48 @type node: L{node.Node}
50 @type table: L{ktable.KTable}
51 @ivar table: the routing table
52 @type token_secrets: C{list} of C{string}
53 @ivar token_secrets: the current secrets to use to create tokens
54 @type stats: L{stats.StatsLogger}
55 @ivar stats: the statistics gatherer
56 @type udp: L{krpc.hostbroker}
57 @ivar udp: the factory for the KRPC protocol
58 @type listenport: L{twisted.internet.interfaces.IListeningPort}
59 @ivar listenport: the UDP listening port
60 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
61 @ivar next_checkpoint: the delayed call for the next checkpoint
66 def __init__(self, config, cache_dir='/tmp'):
67 """Initialize the Khashmir class and call the L{setup} method.
69 @type config: C{dictionary}
70 @param config: the configuration parameters for the DHT
71 @type cache_dir: C{string}
72 @param cache_dir: the directory to store all files in
73 (optional, defaults to the /tmp directory)
76 self.setup(config, cache_dir)
78 def setup(self, config, cache_dir):
79 """Setup all the Khashmir sub-modules.
81 @type config: C{dictionary}
82 @param config: the configuration parameters for the DHT
83 @type cache_dir: C{string}
84 @param cache_dir: the directory to store all files in
87 self.port = config['PORT']
88 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
89 self.node = self._loadSelfNode('', self.port)
90 self.table = KTable(self.node, config)
91 self.token_secrets = [newID()]
92 self.stats = StatsLogger(self.table, self.store)
95 self.udp = krpc.hostbroker(self, self.stats, config)
96 self.udp.protocol = krpc.KRPC
97 self.listenport = reactor.listenUDP(self.port, self.udp)
99 # Load the routing table and begin checkpointing
100 self._loadRoutingTable()
101 self.refreshTable(force = True)
102 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
104 def Node(self, id, host = None, port = None):
105 """Create a new node.
107 @see: L{node.Node.__init__}
109 n = self._Node(id, host, port)
111 n.conn = self.udp.connectionForAddr((n.host, n.port))
115 """Stop listening for packets."""
116 self.listenport.stopListening()
118 def _loadSelfNode(self, host, port):
119 """Create this node, loading any previously saved one."""
120 id = self.store.getSelfNode()
123 return self._Node(id, host, port)
125 def checkpoint(self):
126 """Perform some periodic maintenance operations."""
127 # Create a new token secret
128 self.token_secrets.insert(0, newID())
129 if len(self.token_secrets) > 3:
130 self.token_secrets.pop()
132 # Save some parameters for reloading
133 self.store.saveSelfNode(self.node.id)
134 self.store.dumpRoutingTable(self.table.buckets)
137 self.store.expireValues(self.config['KEY_EXPIRE'])
140 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
141 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
144 def _loadRoutingTable(self):
145 """Load the previous routing table nodes from the database.
147 It's usually a good idea to call refreshTable(force = True) after
150 nodes = self.store.getRoutingTable()
152 n = self.Node(rec[0], rec[1], int(rec[2]))
153 self.table.insertNode(n, contacted = False)
156 def addContact(self, host, port, callback=None, errback=None):
157 """Ping this node and add the contact info to the table on pong.
159 @type host: C{string}
160 @param host: the IP address of the node to contact
162 @param port:the port of the node to contact
163 @type callback: C{method}
164 @param callback: the method to call with the results, it must take 1
165 parameter, the contact info returned by the node
166 (optional, defaults to doing nothing with the results)
167 @type errback: C{method}
168 @param errback: the method to call if an error occurs
169 (optional, defaults to calling the callback with the error)
171 n = self.Node(NULL_ID, host, port)
172 self.sendJoin(n, callback=callback, errback=errback)
174 def findNode(self, id, callback):
175 """Find the contact info for the K closest nodes in the global table.
178 @param id: the target ID to find the K closest nodes of
179 @type callback: C{method}
180 @param callback: the method to call with the results, it must take 1
181 parameter, the list of K closest nodes
183 # Mark the bucket as having been accessed
186 # Start with our node
187 nodes = [copy(self.node)]
189 # Start the finding nodes action
190 state = FindNode(self, id, callback, self.config, self.stats)
191 reactor.callLater(0, state.goWithNodes, nodes)
193 def insertNode(self, node, contacted = True):
194 """Try to insert a node in our local table, pinging oldest contact if necessary.
196 If all you have is a host/port, then use L{addContact}, which calls this
197 method after receiving the PONG from the remote node. The reason for
198 the separation is we can't insert a node into the table without its
199 node ID. That means of course the node passed into this method needs
200 to be a properly formed Node object with a valid ID.
202 @type node: L{node.Node}
203 @param node: the new node to try and insert
204 @type contacted: C{boolean}
205 @param contacted: whether the new node is known to be good, i.e.
206 responded to a request (optional, defaults to True)
208 # Don't add any local nodes to the routing table
209 if not self.config['LOCAL_OK'] and isLocal.match(node.host):
210 log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
213 old = self.table.insertNode(node, contacted=contacted)
215 if (isinstance(old, self._Node) and old.id != self.node.id and
216 (datetime.now() - old.lastSeen) >
217 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
219 # Bucket is full, check to see if old node is still available
220 self.stats.startedAction('ping')
221 df = old.ping(self.node.id)
222 df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler,
223 callbackArgs = (old, datetime.now()),
224 errbackArgs = (old, datetime.now(), node, contacted))
225 elif not old and not contacted:
226 # There's room, we just need to contact the node first
227 self.stats.startedAction('ping')
228 df = node.ping(self.node.id)
229 # Convert the returned contact info into a node
230 df.addCallback(self._pongHandler, datetime.now())
231 # Try adding the contacted node
232 df.addCallbacks(self.insertNode, self._pongError,
233 errbackArgs = (node, datetime.now()))
235 def _freshNodeHandler(self, dict, old, start):
236 """Got a pong from the old node, so update it."""
237 self.stats.completedAction('ping', start)
238 if dict['id'] == old.id:
239 self.table.justSeenNode(old.id)
241 def _staleNodeHandler(self, err, old, start, node, contacted):
242 """The pinged node never responded, so replace it."""
243 log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage()))
244 self.stats.completedAction('ping', start)
245 self.table.invalidateNode(old)
246 self.insertNode(node, contacted)
248 def _pongHandler(self, dict, start):
249 """Node responded properly, change response into a node to insert."""
250 self.stats.completedAction('ping', start)
251 # Create the node using the returned contact info
252 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
255 def _pongError(self, err, node, start):
256 """Error occurred, fail node and errback or callback with error."""
257 log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
258 self.stats.completedAction('ping', start)
259 self.table.nodeFailed(node)
261 def sendJoin(self, node, callback=None, errback=None):
262 """Join the DHT by pinging a bootstrap node.
264 @type node: L{node.Node}
265 @param node: the node to send the join to
266 @type callback: C{method}
267 @param callback: the method to call with the results, it must take 1
268 parameter, the contact info returned by the node
269 (optional, defaults to doing nothing with the results)
270 @type errback: C{method}
271 @param errback: the method to call if an error occurs
272 (optional, defaults to calling the callback with the error)
276 self.stats.startedAction('join')
277 df = node.join(self.node.id)
278 df.addCallbacks(self._joinHandler, self._joinError,
279 callbackArgs = (node, datetime.now()),
280 errbackArgs = (node, datetime.now()))
282 df.addCallbacks(callback, errback)
284 def _joinHandler(self, dict, node, start):
285 """Node responded properly, extract the response."""
286 self.stats.completedAction('join', start)
287 # Create the node using the returned contact info
288 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
289 reactor.callLater(0, self.insertNode, n)
290 return (dict['ip_addr'], dict['port'])
292 def _joinError(self, err, node, start):
293 """Error occurred, fail node."""
294 log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
295 self.stats.completedAction('join', start)
296 self.table.nodeFailed(node)
299 def findCloseNodes(self, callback=lambda a: None):
300 """Perform a findNode on the ID one away from our own.
302 This will allow us to populate our table with nodes on our network
303 closest to our own. This is called as soon as we start up with an
306 @type callback: C{method}
307 @param callback: the method to call with the results, it must take 1
308 parameter, the list of K closest nodes
309 (optional, defaults to doing nothing with the results)
311 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
312 self.findNode(id, callback)
314 def refreshTable(self, force = False):
315 """Check all the buckets for those that need refreshing.
317 @param force: refresh all buckets regardless of last bucket access time
318 (optional, defaults to False)
323 for bucket in self.table.buckets:
324 if force or (datetime.now() - bucket.lastAccessed >
325 timedelta(seconds=self.config['BUCKET_STALENESS'])):
326 # Choose a random ID in the bucket and try and find it
327 id = newIDInRange(bucket.min, bucket.max)
328 self.findNode(id, callback)
331 """Closes the port and cancels pending later calls."""
332 self.listenport.stopListening()
334 self.next_checkpoint.cancel()
340 """Gather the statistics for the DHT."""
341 return self.stats.formatHTML()
344 def krpc_ping(self, id, _krpc_sender = None):
348 @param id: the node ID of the sender node
349 @type _krpc_sender: (C{string}, C{int})
350 @param _krpc_sender: the sender node's IP address and port
352 if _krpc_sender is not None:
353 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
354 reactor.callLater(0, self.insertNode, n, False)
356 return {"id" : self.node.id}
358 def krpc_join(self, id, _krpc_sender = None):
359 """Add the node by responding with its address and port.
362 @param id: the node ID of the sender node
363 @type _krpc_sender: (C{string}, C{int})
364 @param _krpc_sender: the sender node's IP address and port
366 if _krpc_sender is not None:
367 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
368 reactor.callLater(0, self.insertNode, n, False)
370 _krpc_sender = ('127.0.0.1', self.port)
372 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
374 def krpc_find_node(self, id, target, _krpc_sender = None):
375 """Find the K closest nodes to the target in the local routing table.
377 @type target: C{string}
378 @param target: the target ID to find nodes for
380 @param id: the node ID of the sender node
381 @type _krpc_sender: (C{string}, C{int})
382 @param _krpc_sender: the sender node's IP address and port
384 if _krpc_sender is not None:
385 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
386 reactor.callLater(0, self.insertNode, n, False)
388 _krpc_sender = ('127.0.0.1', self.port)
390 nodes = self.table.findNodes(target)
391 nodes = map(lambda node: node.contactInfo(), nodes)
392 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
393 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
396 class KhashmirRead(KhashmirBase):
397 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
402 def findValue(self, key, callback):
403 """Get the nodes that have values for the key from the global table.
406 @param key: the target key to find the values for
407 @type callback: C{method}
408 @param callback: the method to call with the results, it must take 1
409 parameter, the list of nodes with values
411 # Mark the bucket as having been accessed
412 self.table.touch(key)
415 nodes = [copy(self.node)]
417 # Search for others starting with the locally found ones
418 state = FindValue(self, key, callback, self.config, self.stats)
419 reactor.callLater(0, state.goWithNodes, nodes)
421 def valueForKey(self, key, callback, searchlocal = True):
422 """Get the values found for key in global table.
424 Callback will be called with a list of values for each peer that
425 returns unique values. The final callback will be an empty list.
428 @param key: the target key to get the values for
429 @type callback: C{method}
430 @param callback: the method to call with the results, it must take 2
431 parameters: the key, and the values found
432 @type searchlocal: C{boolean}
433 @param searchlocal: whether to also look for any local values
436 def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
437 """Use the found nodes to send requests for values to."""
438 # Get any local values
440 l = self.store.retrieveValues(key)
442 node = copy(self.node)
443 node.updateNumValues(len(l))
444 nodes = nodes + [node]
446 state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
447 reactor.callLater(0, state.goWithNodes, nodes)
449 # First lookup nodes that have values for the key
450 self.findValue(key, _getValueForKey)
453 def krpc_find_value(self, id, key, _krpc_sender = None):
454 """Find the number of values stored locally for the key, and the K closest nodes.
457 @param key: the target key to find the values and nodes for
459 @param id: the node ID of the sender node
460 @type _krpc_sender: (C{string}, C{int})
461 @param _krpc_sender: the sender node's IP address and port
463 if _krpc_sender is not None:
464 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
465 reactor.callLater(0, self.insertNode, n, False)
467 nodes = self.table.findNodes(key)
468 nodes = map(lambda node: node.contactInfo(), nodes)
469 num_values = self.store.countValues(key)
470 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
472 def krpc_get_value(self, id, key, num, _krpc_sender = None):
473 """Retrieve the values stored locally for the key.
476 @param key: the target key to retrieve the values for
478 @param num: the maximum number of values to retrieve, or 0 to
481 @param id: the node ID of the sender node
482 @type _krpc_sender: (C{string}, C{int})
483 @param _krpc_sender: the sender node's IP address and port
485 if _krpc_sender is not None:
486 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
487 reactor.callLater(0, self.insertNode, n, False)
489 l = self.store.retrieveValues(key)
490 if num == 0 or num >= len(l):
491 return {'values' : l, "id": self.node.id}
494 return {'values' : l[:num], "id": self.node.id}
497 class KhashmirWrite(KhashmirRead):
498 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
503 def storeValueForKey(self, key, value, callback=None):
504 """Stores the value for the key in the global table.
506 No status in this implementation, peers respond but don't indicate
507 status of storing values.
510 @param key: the target key to store the value for
511 @type value: C{string}
512 @param value: the value to store with the key
513 @type callback: C{method}
514 @param callback: the method to call with the results, it must take 3
515 parameters: the key, the value stored, and the result of the store
516 (optional, defaults to doing nothing with the results)
518 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
519 """Use the returned K closest nodes to store the key at."""
521 def _storedValueHandler(key, value, sender):
522 """Default callback that does nothing."""
524 response = _storedValueHandler
525 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
526 reactor.callLater(0, action.goWithNodes, nodes)
528 # First find the K closest nodes to operate on.
529 self.findNode(key, _storeValueForKey)
532 def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
533 """Store the value locally with the key.
536 @param key: the target key to store the value for
537 @type value: C{string}
538 @param value: the value to store with the key
539 @param token: the token to confirm that this peer contacted us previously
541 @param id: the node ID of the sender node
542 @type _krpc_sender: (C{string}, C{int})
543 @param _krpc_sender: the sender node's IP address and port
545 if _krpc_sender is not None:
546 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
547 reactor.callLater(0, self.insertNode, n, False)
549 _krpc_sender = ('127.0.0.1', self.port)
551 for secret in self.token_secrets:
552 this_token = sha(secret + _krpc_sender[0]).digest()
553 if token == this_token:
554 self.store.storeValue(key, value)
555 return {"id" : self.node.id}
556 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
559 class Khashmir(KhashmirWrite):
560 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
564 class SimpleTests(unittest.TestCase):
567 DHT_DEFAULTS = {'PORT': 9977,
568 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
569 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
570 'MAX_FAILURES': 3, 'LOCAL_OK': True,
571 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
572 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
573 'KEY_EXPIRE': 3600, 'SPEW': True, }
576 d = self.DHT_DEFAULTS.copy()
579 d = self.DHT_DEFAULTS.copy()
586 os.unlink(self.a.store.db)
587 os.unlink(self.b.store.db)
589 def testAddContact(self):
590 self.failUnlessEqual(len(self.a.table.buckets), 1)
591 self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
593 self.failUnlessEqual(len(self.b.table.buckets), 1)
594 self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
596 self.a.addContact('127.0.0.1', 4045)
602 self.failUnlessEqual(len(self.a.table.buckets), 1)
603 self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
604 self.failUnlessEqual(len(self.b.table.buckets), 1)
605 self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
607 def testStoreRetrieve(self):
608 self.a.addContact('127.0.0.1', 4045)
614 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
621 self.a.valueForKey(sha('foo').digest(), self._cb)
630 def _cb(self, key, val):
632 self.failUnlessEqual(self.got, 1)
633 elif 'foobar' in val:
637 class MultiTest(unittest.TestCase):
641 DHT_DEFAULTS = {'PORT': 9977,
642 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
643 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
644 'MAX_FAILURES': 3, 'LOCAL_OK': True,
645 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
646 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
647 'KEY_EXPIRE': 3600, 'SPEW': True, }
649 def _done(self, val):
654 self.startport = 4088
655 for i in range(self.num):
656 d = self.DHT_DEFAULTS.copy()
657 d['PORT'] = self.startport + i
658 self.l.append(Khashmir(d))
663 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
664 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
665 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
672 i.findCloseNodes(self._done)
677 i.findCloseNodes(self._done)
684 os.unlink(i.store.db)
688 def testStoreRetrieve(self):
695 def _scb(key, value, result):
697 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
705 self.failUnlessEqual(self.got, 1)
711 self.l[randrange(0, self.num)].valueForKey(K, _rcb)