2 """The main Khashmir program.
4 @var isLocal: a compiled regular expression suitable for testing if an
5 IP address is from a known local or private range
9 warnings.simplefilter("ignore", DeprecationWarning)
11 from datetime import datetime, timedelta
12 from random import randrange, shuffle
17 from twisted.internet.defer import Deferred
18 from twisted.internet.base import DelayedCall
19 from twisted.internet import protocol, reactor
20 from twisted.python import log
21 from twisted.trial import unittest
24 from ktable import KTable
25 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
26 from khash import newID, newIDInRange
27 from actions import FindNode, FindValue, GetValue, StoreValue
28 from stats import StatsLogger
31 isLocal = re.compile('^(192\.168\.[0-9]{1,3}\.[0-9]{1,3})|'+
32 '(10\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})|'+
33 '(172\.0?1[6-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
34 '(172\.0?2[0-9]\.[0-9]{1,3}\.[0-9]{1,3})|'+
35 '(172\.0?3[0-1]\.[0-9]{1,3}\.[0-9]{1,3})|'+
36 '(127\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})$')
38 class KhashmirBase(protocol.Factory):
39 """The base Khashmir class, with base functionality and find node, no key-value mappings.
41 @type _Node: L{node.Node}
42 @ivar _Node: the knode implementation to use for this class of DHT
43 @type config: C{dictionary}
44 @ivar config: the configuration parameters for the DHT
45 @type pinging: C{dictionary}
46 @ivar pinging: the node's that are currently being pinged, keys are the
47 node id's, values are the Deferred or DelayedCall objects
49 @ivar port: the port to listen on
51 @ivar store: the database to store nodes and key/value pairs in
52 @type node: L{node.Node}
54 @type table: L{ktable.KTable}
55 @ivar table: the routing table
56 @type token_secrets: C{list} of C{string}
57 @ivar token_secrets: the current secrets to use to create tokens
58 @type stats: L{stats.StatsLogger}
59 @ivar stats: the statistics gatherer
60 @type udp: L{krpc.hostbroker}
61 @ivar udp: the factory for the KRPC protocol
62 @type listenport: L{twisted.internet.interfaces.IListeningPort}
63 @ivar listenport: the UDP listening port
64 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
65 @ivar next_checkpoint: the delayed call for the next checkpoint
70 def __init__(self, config, cache_dir='/tmp'):
71 """Initialize the Khashmir class and call the L{setup} method.
73 @type config: C{dictionary}
74 @param config: the configuration parameters for the DHT
75 @type cache_dir: C{string}
76 @param cache_dir: the directory to store all files in
77 (optional, defaults to the /tmp directory)
81 self.setup(config, cache_dir)
83 def setup(self, config, cache_dir):
84 """Setup all the Khashmir sub-modules.
86 @type config: C{dictionary}
87 @param config: the configuration parameters for the DHT
88 @type cache_dir: C{string}
89 @param cache_dir: the directory to store all files in
92 self.port = config['PORT']
93 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
94 self.node = self._loadSelfNode('', self.port)
95 self.table = KTable(self.node, config)
96 self.token_secrets = [newID()]
97 self.stats = StatsLogger(self.table, self.store)
100 self.udp = krpc.hostbroker(self, self.stats, config)
101 self.udp.protocol = krpc.KRPC
102 self.listenport = reactor.listenUDP(self.port, self.udp)
104 # Load the routing table and begin checkpointing
105 self._loadRoutingTable()
106 self.refreshTable(force = True)
107 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
109 def Node(self, id, host = None, port = None):
110 """Create a new node.
112 @see: L{node.Node.__init__}
114 n = self._Node(id, host, port)
116 n.conn = self.udp.connectionForAddr((n.host, n.port))
120 """Stop listening for packets."""
121 self.listenport.stopListening()
123 def _loadSelfNode(self, host, port):
124 """Create this node, loading any previously saved one."""
125 id = self.store.getSelfNode()
128 return self._Node(id, host, port)
130 def checkpoint(self):
131 """Perform some periodic maintenance operations."""
132 # Create a new token secret
133 self.token_secrets.insert(0, newID())
134 if len(self.token_secrets) > 3:
135 self.token_secrets.pop()
137 # Save some parameters for reloading
138 self.store.saveSelfNode(self.node.id)
139 self.store.dumpRoutingTable(self.table.buckets)
142 self.store.expireValues(self.config['KEY_EXPIRE'])
145 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
146 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
149 def _loadRoutingTable(self):
150 """Load the previous routing table nodes from the database.
152 It's usually a good idea to call refreshTable(force = True) after
155 nodes = self.store.getRoutingTable()
157 n = self.Node(rec[0], rec[1], int(rec[2]))
158 self.table.insertNode(n, contacted = False)
161 def addContact(self, host, port, callback=None, errback=None):
162 """Ping this node and add the contact info to the table on pong.
164 @type host: C{string}
165 @param host: the IP address of the node to contact
167 @param port:the port of the node to contact
168 @type callback: C{method}
169 @param callback: the method to call with the results, it must take 1
170 parameter, the contact info returned by the node
171 (optional, defaults to doing nothing with the results)
172 @type errback: C{method}
173 @param errback: the method to call if an error occurs
174 (optional, defaults to calling the callback with the error)
176 n = self.Node(NULL_ID, host, port)
177 self.sendJoin(n, callback=callback, errback=errback)
179 def findNode(self, id, callback):
180 """Find the contact info for the K closest nodes in the global table.
183 @param id: the target ID to find the K closest nodes of
184 @type callback: C{method}
185 @param callback: the method to call with the results, it must take 1
186 parameter, the list of K closest nodes
188 # Mark the bucket as having been accessed
191 # Start with our node
192 nodes = [copy(self.node)]
194 # Start the finding nodes action
195 state = FindNode(self, id, callback, self.config, self.stats)
196 reactor.callLater(0, state.goWithNodes, nodes)
198 def insertNode(self, node, contacted = True):
199 """Try to insert a node in our local table, pinging oldest contact if necessary.
201 If all you have is a host/port, then use L{addContact}, which calls this
202 method after receiving the PONG from the remote node. The reason for
203 the separation is we can't insert a node into the table without its
204 node ID. That means of course the node passed into this method needs
205 to be a properly formed Node object with a valid ID.
207 @type node: L{node.Node}
208 @param node: the new node to try and insert
209 @type contacted: C{boolean}
210 @param contacted: whether the new node is known to be good, i.e.
211 responded to a request (optional, defaults to True)
213 # Don't add any local nodes to the routing table
214 if not self.config['LOCAL_OK'] and isLocal.match(node.host):
215 log.msg('Not adding local node to table: %s/%s' % (node.host, node.port))
218 old = self.table.insertNode(node, contacted=contacted)
220 if (isinstance(old, self._Node) and old.id != self.node.id and
221 (datetime.now() - old.lastSeen) >
222 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
224 # Bucket is full, check to see if old node is still available
225 df = self.sendPing(old)
226 df.addErrback(self._staleNodeHandler, old, node, contacted)
227 elif not old and not contacted:
228 # There's room, we just need to contact the node first
229 df = self.sendPing(node)
230 # Also schedule a future ping to make sure the node works
231 def rePing(newnode, self = self):
232 if newnode.id not in self.pinging:
233 self.pinging[newnode.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
234 self.sendPing, newnode)
235 df.addCallback(rePing)
237 def _staleNodeHandler(self, err, old, node, contacted):
238 """The pinged node never responded, so replace it."""
239 self.table.invalidateNode(old)
240 self.insertNode(node, contacted)
243 def nodeFailed(self, node):
244 """Mark a node as having failed a request and schedule a future check.
246 @type node: L{node.Node}
247 @param node: the new node to try and insert
249 exists = self.table.nodeFailed(node)
251 # If in the table, schedule a ping, if one isn't already sent/scheduled
252 if exists and node.id not in self.pinging:
253 self.pinging[node.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'],
256 def sendPing(self, node):
257 """Ping the node to see if it's still alive.
259 @type node: L{node.Node}
260 @param node: the node to send the join to
262 # Check for a ping already underway
263 if (isinstance(self.pinging.get(node.id, None), DelayedCall) and
264 self.pinging[node.id].active()):
265 self.pinging[node.id].cancel()
266 elif isinstance(self.pinging.get(node.id, None), Deferred):
267 return self.pinging[node.id]
269 self.stats.startedAction('ping')
270 df = node.ping(self.node.id)
271 self.pinging[node.id] = df
272 df.addCallbacks(self._pingHandler, self._pingError,
273 callbackArgs = (node, datetime.now()),
274 errbackArgs = (node, datetime.now()))
277 def _pingHandler(self, dict, node, start):
278 """Node responded properly, update it and return the node object."""
279 self.stats.completedAction('ping', start)
280 del self.pinging[node.id]
281 # Create the node using the returned contact info
282 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
283 reactor.callLater(0, self.insertNode, n)
286 def _pingError(self, err, node, start):
287 """Error occurred, fail node."""
288 log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
289 self.stats.completedAction('ping', start)
290 del self.pinging[node.id]
291 self.nodeFailed(node)
294 def sendJoin(self, node, callback=None, errback=None):
295 """Join the DHT by pinging a bootstrap node.
297 @type node: L{node.Node}
298 @param node: the node to send the join to
299 @type callback: C{method}
300 @param callback: the method to call with the results, it must take 1
301 parameter, the contact info returned by the node
302 (optional, defaults to doing nothing with the results)
303 @type errback: C{method}
304 @param errback: the method to call if an error occurs
305 (optional, defaults to calling the callback with the error)
309 self.stats.startedAction('join')
310 df = node.join(self.node.id)
311 df.addCallbacks(self._joinHandler, self._joinError,
312 callbackArgs = (node, datetime.now()),
313 errbackArgs = (node, datetime.now()))
315 df.addCallbacks(callback, errback)
317 def _joinHandler(self, dict, node, start):
318 """Node responded properly, extract the response."""
319 self.stats.completedAction('join', start)
320 # Create the node using the returned contact info
321 n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
322 reactor.callLater(0, self.insertNode, n)
323 return (dict['ip_addr'], dict['port'])
325 def _joinError(self, err, node, start):
326 """Error occurred, fail node."""
327 log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage()))
328 self.stats.completedAction('join', start)
329 self.nodeFailed(node)
332 def findCloseNodes(self, callback=lambda a: None):
333 """Perform a findNode on the ID one away from our own.
335 This will allow us to populate our table with nodes on our network
336 closest to our own. This is called as soon as we start up with an
339 @type callback: C{method}
340 @param callback: the method to call with the results, it must take 1
341 parameter, the list of K closest nodes
342 (optional, defaults to doing nothing with the results)
344 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
345 self.findNode(id, callback)
347 def refreshTable(self, force = False):
348 """Check all the buckets for those that need refreshing.
350 @param force: refresh all buckets regardless of last bucket access time
351 (optional, defaults to False)
356 for bucket in self.table.buckets:
357 if force or (datetime.now() - bucket.lastAccessed >
358 timedelta(seconds=self.config['BUCKET_STALENESS'])):
359 # Choose a random ID in the bucket and try and find it
360 id = newIDInRange(bucket.min, bucket.max)
361 self.findNode(id, callback)
364 """Closes the port and cancels pending later calls."""
365 self.listenport.stopListening()
367 self.next_checkpoint.cancel()
370 for call in self.pinging:
371 if isinstance(call, DelayedCall) and call.active():
376 """Gather the statistics for the DHT."""
377 return self.stats.formatHTML()
380 def krpc_ping(self, id, _krpc_sender = None):
384 @param id: the node ID of the sender node
385 @type _krpc_sender: (C{string}, C{int})
386 @param _krpc_sender: the sender node's IP address and port
388 if _krpc_sender is not None:
389 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
390 reactor.callLater(0, self.insertNode, n, False)
392 return {"id" : self.node.id}
394 def krpc_join(self, id, _krpc_sender = None):
395 """Add the node by responding with its address and port.
398 @param id: the node ID of the sender node
399 @type _krpc_sender: (C{string}, C{int})
400 @param _krpc_sender: the sender node's IP address and port
402 if _krpc_sender is not None:
403 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
404 reactor.callLater(0, self.insertNode, n, False)
406 _krpc_sender = ('127.0.0.1', self.port)
408 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
410 def krpc_find_node(self, id, target, _krpc_sender = None):
411 """Find the K closest nodes to the target in the local routing table.
413 @type target: C{string}
414 @param target: the target ID to find nodes for
416 @param id: the node ID of the sender node
417 @type _krpc_sender: (C{string}, C{int})
418 @param _krpc_sender: the sender node's IP address and port
420 if _krpc_sender is not None:
421 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
422 reactor.callLater(0, self.insertNode, n, False)
424 _krpc_sender = ('127.0.0.1', self.port)
426 nodes = self.table.findNodes(target)
427 nodes = map(lambda node: node.contactInfo(), nodes)
428 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
429 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
432 class KhashmirRead(KhashmirBase):
433 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
438 def findValue(self, key, callback):
439 """Get the nodes that have values for the key from the global table.
442 @param key: the target key to find the values for
443 @type callback: C{method}
444 @param callback: the method to call with the results, it must take 1
445 parameter, the list of nodes with values
447 # Mark the bucket as having been accessed
448 self.table.touch(key)
451 nodes = [copy(self.node)]
453 # Search for others starting with the locally found ones
454 state = FindValue(self, key, callback, self.config, self.stats)
455 reactor.callLater(0, state.goWithNodes, nodes)
457 def valueForKey(self, key, callback, searchlocal = True):
458 """Get the values found for key in global table.
460 Callback will be called with a list of values for each peer that
461 returns unique values. The final callback will be an empty list.
464 @param key: the target key to get the values for
465 @type callback: C{method}
466 @param callback: the method to call with the results, it must take 2
467 parameters: the key, and the values found
468 @type searchlocal: C{boolean}
469 @param searchlocal: whether to also look for any local values
472 def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
473 """Use the found nodes to send requests for values to."""
474 # Get any local values
476 l = self.store.retrieveValues(key)
478 node = copy(self.node)
479 node.updateNumValues(len(l))
480 nodes = nodes + [node]
482 state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
483 reactor.callLater(0, state.goWithNodes, nodes)
485 # First lookup nodes that have values for the key
486 self.findValue(key, _getValueForKey)
489 def krpc_find_value(self, id, key, _krpc_sender = None):
490 """Find the number of values stored locally for the key, and the K closest nodes.
493 @param key: the target key to find the values and nodes for
495 @param id: the node ID of the sender node
496 @type _krpc_sender: (C{string}, C{int})
497 @param _krpc_sender: the sender node's IP address and port
499 if _krpc_sender is not None:
500 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
501 reactor.callLater(0, self.insertNode, n, False)
503 nodes = self.table.findNodes(key)
504 nodes = map(lambda node: node.contactInfo(), nodes)
505 num_values = self.store.countValues(key)
506 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
508 def krpc_get_value(self, id, key, num, _krpc_sender = None):
509 """Retrieve the values stored locally for the key.
512 @param key: the target key to retrieve the values for
514 @param num: the maximum number of values to retrieve, or 0 to
517 @param id: the node ID of the sender node
518 @type _krpc_sender: (C{string}, C{int})
519 @param _krpc_sender: the sender node's IP address and port
521 if _krpc_sender is not None:
522 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
523 reactor.callLater(0, self.insertNode, n, False)
525 l = self.store.retrieveValues(key)
526 if num == 0 or num >= len(l):
527 return {'values' : l, "id": self.node.id}
530 return {'values' : l[:num], "id": self.node.id}
533 class KhashmirWrite(KhashmirRead):
534 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
539 def storeValueForKey(self, key, value, callback=None):
540 """Stores the value for the key in the global table.
542 No status in this implementation, peers respond but don't indicate
543 status of storing values.
546 @param key: the target key to store the value for
547 @type value: C{string}
548 @param value: the value to store with the key
549 @type callback: C{method}
550 @param callback: the method to call with the results, it must take 3
551 parameters: the key, the value stored, and the result of the store
552 (optional, defaults to doing nothing with the results)
554 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
555 """Use the returned K closest nodes to store the key at."""
557 def _storedValueHandler(key, value, sender):
558 """Default callback that does nothing."""
560 response = _storedValueHandler
561 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
562 reactor.callLater(0, action.goWithNodes, nodes)
564 # First find the K closest nodes to operate on.
565 self.findNode(key, _storeValueForKey)
568 def krpc_store_value(self, id, key, value, token, _krpc_sender = None):
569 """Store the value locally with the key.
572 @param key: the target key to store the value for
573 @type value: C{string}
574 @param value: the value to store with the key
575 @param token: the token to confirm that this peer contacted us previously
577 @param id: the node ID of the sender node
578 @type _krpc_sender: (C{string}, C{int})
579 @param _krpc_sender: the sender node's IP address and port
581 if _krpc_sender is not None:
582 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
583 reactor.callLater(0, self.insertNode, n, False)
585 _krpc_sender = ('127.0.0.1', self.port)
587 for secret in self.token_secrets:
588 this_token = sha(secret + _krpc_sender[0]).digest()
589 if token == this_token:
590 self.store.storeValue(key, value)
591 return {"id" : self.node.id}
592 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
595 class Khashmir(KhashmirWrite):
596 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
600 class SimpleTests(unittest.TestCase):
603 DHT_DEFAULTS = {'PORT': 9977,
604 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
605 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
606 'MAX_FAILURES': 3, 'LOCAL_OK': True,
607 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
608 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
609 'KEY_EXPIRE': 3600, 'SPEW': True, }
612 d = self.DHT_DEFAULTS.copy()
615 d = self.DHT_DEFAULTS.copy()
622 os.unlink(self.a.store.db)
623 os.unlink(self.b.store.db)
625 def testAddContact(self):
626 self.failUnlessEqual(len(self.a.table.buckets), 1)
627 self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0)
629 self.failUnlessEqual(len(self.b.table.buckets), 1)
630 self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0)
632 self.a.addContact('127.0.0.1', 4045)
642 self.failUnlessEqual(len(self.a.table.buckets), 1)
643 self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1)
644 self.failUnlessEqual(len(self.b.table.buckets), 1)
645 self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1)
647 def testStoreRetrieve(self):
648 self.a.addContact('127.0.0.1', 4045)
654 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
661 self.a.valueForKey(sha('foo').digest(), self._cb)
684 def _cb(self, key, val):
686 self.failUnlessEqual(self.got, 1)
687 elif 'foobar' in val:
691 class MultiTest(unittest.TestCase):
695 DHT_DEFAULTS = {'PORT': 9977,
696 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8,
697 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000,
698 'MAX_FAILURES': 3, 'LOCAL_OK': True,
699 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
700 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2,
701 'KEY_EXPIRE': 3600, 'SPEW': True, }
703 def _done(self, val):
708 self.startport = 4088
709 for i in range(self.num):
710 d = self.DHT_DEFAULTS.copy()
711 d['PORT'] = self.startport + i
712 self.l.append(Khashmir(d))
717 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
718 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
719 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
729 i.findCloseNodes(self._done)
734 i.findCloseNodes(self._done)
741 os.unlink(i.store.db)
745 def testStoreRetrieve(self):
752 def _scb(key, value, result):
754 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
762 self.failUnlessEqual(self.got, 1)
768 self.l[randrange(0, self.num)].valueForKey(K, _rcb)