1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """The main Khashmir program."""
7 warnings.simplefilter("ignore", DeprecationWarning)
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
14 from twisted.internet.defer import Deferred
15 from twisted.internet import protocol, reactor
16 from twisted.trial import unittest
19 from ktable import KTable
20 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
21 from khash import newID, newIDInRange
22 from actions import FindNode, FindValue, GetValue, StoreValue
23 from stats import StatsLogger
26 class KhashmirBase(protocol.Factory):
27 """The base Khashmir class, with base functionality and find node, no key-value mappings.
29 @type _Node: L{node.Node}
30 @ivar _Node: the knode implementation to use for this class of DHT
31 @type config: C{dictionary}
32 @ivar config: the configuration parameters for the DHT
34 @ivar port: the port to listen on
36 @ivar store: the database to store nodes and key/value pairs in
37 @type node: L{node.Node}
39 @type table: L{ktable.KTable}
40 @ivar table: the routing table
41 @type token_secrets: C{list} of C{string}
42 @ivar token_secrets: the current secrets to use to create tokens
43 @type stats: L{stats.StatsLogger}
44 @ivar stats: the statistics gatherer
45 @type udp: L{krpc.hostbroker}
46 @ivar udp: the factory for the KRPC protocol
47 @type listenport: L{twisted.internet.interfaces.IListeningPort}
48 @ivar listenport: the UDP listening port
49 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
50 @ivar next_checkpoint: the delayed call for the next checkpoint
55 def __init__(self, config, cache_dir='/tmp'):
56 """Initialize the Khashmir class and call the L{setup} method.
58 @type config: C{dictionary}
59 @param config: the configuration parameters for the DHT
60 @type cache_dir: C{string}
61 @param cache_dir: the directory to store all files in
62 (optional, defaults to the /tmp directory)
65 self.setup(config, cache_dir)
67 def setup(self, config, cache_dir):
68 """Setup all the Khashmir sub-modules.
70 @type config: C{dictionary}
71 @param config: the configuration parameters for the DHT
72 @type cache_dir: C{string}
73 @param cache_dir: the directory to store all files in
76 self.port = config['PORT']
77 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
78 self.node = self._loadSelfNode('', self.port)
79 self.table = KTable(self.node, config)
80 self.token_secrets = [newID()]
81 self.stats = StatsLogger(self.table, self.store, self.config)
84 self.udp = krpc.hostbroker(self, self.stats, config)
85 self.udp.protocol = krpc.KRPC
86 self.listenport = reactor.listenUDP(self.port, self.udp)
88 # Load the routing table and begin checkpointing
89 self._loadRoutingTable()
90 self.refreshTable(force = True)
91 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
93 def Node(self, id, host = None, port = None):
96 @see: L{node.Node.__init__}
98 n = self._Node(id, host, port)
100 n.conn = self.udp.connectionForAddr((n.host, n.port))
104 """Stop listening for packets."""
105 self.listenport.stopListening()
107 def _loadSelfNode(self, host, port):
108 """Create this node, loading any previously saved one."""
109 id = self.store.getSelfNode()
112 return self._Node(id, host, port)
114 def checkpoint(self):
115 """Perform some periodic maintenance operations."""
116 # Create a new token secret
117 self.token_secrets.insert(0, newID())
118 if len(self.token_secrets) > 3:
119 self.token_secrets.pop()
121 # Save some parameters for reloading
122 self.store.saveSelfNode(self.node.id)
123 self.store.dumpRoutingTable(self.table.buckets)
126 self.store.expireValues(self.config['KEY_EXPIRE'])
129 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
130 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
133 def _loadRoutingTable(self):
134 """Load the previous routing table nodes from the database.
136 It's usually a good idea to call refreshTable(force = True) after
139 nodes = self.store.getRoutingTable()
141 n = self.Node(rec[0], rec[1], int(rec[2]))
142 self.table.insertNode(n, contacted = False)
145 def addContact(self, host, port, callback=None, errback=None):
146 """Ping this node and add the contact info to the table on pong.
148 @type host: C{string}
149 @param host: the IP address of the node to contact
151 @param port:the port of the node to contact
152 @type callback: C{method}
153 @param callback: the method to call with the results, it must take 1
154 parameter, the contact info returned by the node
155 (optional, defaults to doing nothing with the results)
156 @type errback: C{method}
157 @param errback: the method to call if an error occurs
158 (optional, defaults to calling the callback with None)
160 n = self.Node(NULL_ID, host, port)
161 self.sendJoin(n, callback=callback, errback=errback)
163 def findNode(self, id, callback, errback=None):
164 """Find the contact info for the K closest nodes in the global table.
167 @param id: the target ID to find the K closest nodes of
168 @type callback: C{method}
169 @param callback: the method to call with the results, it must take 1
170 parameter, the list of K closest nodes
171 @type errback: C{method}
172 @param errback: the method to call if an error occurs
173 (optional, defaults to doing nothing when an error occurs)
175 # Get K nodes out of local table/cache
176 nodes = self.table.findNodes(id)
179 d.addCallbacks(callback, errback)
181 d.addCallback(callback)
183 # If the target ID was found
184 if len(nodes) == 1 and nodes[0].id == id:
187 # Start the finding nodes action
188 state = FindNode(self, id, d.callback, self.config, self.stats)
189 reactor.callLater(0, state.goWithNodes, nodes)
191 def insertNode(self, node, contacted = True):
192 """Try to insert a node in our local table, pinging oldest contact if necessary.
194 If all you have is a host/port, then use L{addContact}, which calls this
195 method after receiving the PONG from the remote node. The reason for
196 the seperation is we can't insert a node into the table without its
197 node ID. That means of course the node passed into this method needs
198 to be a properly formed Node object with a valid ID.
200 @type node: L{node.Node}
201 @param node: the new node to try and insert
202 @type contacted: C{boolean}
203 @param contacted: whether the new node is known to be good, i.e.
204 responded to a request (optional, defaults to True)
206 old = self.table.insertNode(node, contacted=contacted)
207 if (old and old.id != self.node.id and
208 (datetime.now() - old.lastSeen) >
209 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
211 def _staleNodeHandler(oldnode = old, newnode = node):
212 """The pinged node never responded, so replace it."""
213 self.table.replaceStaleNode(oldnode, newnode)
215 def _notStaleNodeHandler(dict, old=old):
216 """Got a pong from the old node, so update it."""
218 if dict['id'] == old.id:
219 self.table.justSeenNode(old.id)
221 # Bucket is full, check to see if old node is still available
222 self.stats.startedAction('ping')
223 df = old.ping(self.node.id)
224 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
226 def sendJoin(self, node, callback=None, errback=None):
227 """Join the DHT by pinging a bootstrap node.
229 @type node: L{node.Node}
230 @param node: the node to send the join to
231 @type callback: C{method}
232 @param callback: the method to call with the results, it must take 1
233 parameter, the contact info returned by the node
234 (optional, defaults to doing nothing with the results)
235 @type errback: C{method}
236 @param errback: the method to call if an error occurs
237 (optional, defaults to calling the callback with None)
240 def _pongHandler(dict, node=node, self=self, callback=callback):
241 """Node responded properly, callback with response."""
242 n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
245 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
247 def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback):
248 """Error occurred, fail node and errback or callback with error."""
249 table.nodeFailed(node)
255 self.stats.startedAction('join')
256 df = node.join(self.node.id)
257 df.addCallbacks(_pongHandler, _defaultPong)
259 def findCloseNodes(self, callback=lambda a: None, errback = None):
260 """Perform a findNode on the ID one away from our own.
262 This will allow us to populate our table with nodes on our network
263 closest to our own. This is called as soon as we start up with an
266 @type callback: C{method}
267 @param callback: the method to call with the results, it must take 1
268 parameter, the list of K closest nodes
269 (optional, defaults to doing nothing with the results)
270 @type errback: C{method}
271 @param errback: the method to call if an error occurs
272 (optional, defaults to doing nothing when an error occurs)
274 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
275 self.findNode(id, callback, errback)
277 def refreshTable(self, force = False):
278 """Check all the buckets for those that need refreshing.
280 @param force: refresh all buckets regardless of last bucket access time
281 (optional, defaults to False)
286 for bucket in self.table.buckets:
287 if force or (datetime.now() - bucket.lastAccessed >
288 timedelta(seconds=self.config['BUCKET_STALENESS'])):
289 # Choose a random ID in the bucket and try and find it
290 id = newIDInRange(bucket.min, bucket.max)
291 self.findNode(id, callback)
294 """Closes the port and cancels pending later calls."""
295 self.listenport.stopListening()
297 self.next_checkpoint.cancel()
303 """Gather the statistics for the DHT."""
304 return self.stats.gather()
307 def krpc_ping(self, id, _krpc_sender):
311 @param id: the node ID of the sender node
312 @type _krpc_sender: (C{string}, C{int})
313 @param _krpc_sender: the sender node's IP address and port
315 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
316 self.insertNode(n, contacted = False)
318 return {"id" : self.node.id}
320 def krpc_join(self, id, _krpc_sender):
321 """Add the node by responding with its address and port.
324 @param id: the node ID of the sender node
325 @type _krpc_sender: (C{string}, C{int})
326 @param _krpc_sender: the sender node's IP address and port
328 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
329 self.insertNode(n, contacted = False)
331 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
333 def krpc_find_node(self, target, id, _krpc_sender):
334 """Find the K closest nodes to the target in the local routing table.
336 @type target: C{string}
337 @param target: the target ID to find nodes for
339 @param id: the node ID of the sender node
340 @type _krpc_sender: (C{string}, C{int})
341 @param _krpc_sender: the sender node's IP address and port
343 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
344 self.insertNode(n, contacted = False)
346 nodes = self.table.findNodes(target)
347 nodes = map(lambda node: node.contactInfo(), nodes)
348 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
349 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
352 class KhashmirRead(KhashmirBase):
353 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
358 def findValue(self, key, callback, errback=None):
359 """Get the nodes that have values for the key from the global table.
362 @param key: the target key to find the values for
363 @type callback: C{method}
364 @param callback: the method to call with the results, it must take 1
365 parameter, the list of nodes with values
366 @type errback: C{method}
367 @param errback: the method to call if an error occurs
368 (optional, defaults to doing nothing when an error occurs)
370 # Get K nodes out of local table/cache
371 nodes = self.table.findNodes(key)
374 d.addCallbacks(callback, errback)
376 d.addCallback(callback)
378 # Search for others starting with the locally found ones
379 state = FindValue(self, key, d.callback, self.config, self.stats)
380 reactor.callLater(0, state.goWithNodes, nodes)
382 def valueForKey(self, key, callback, searchlocal = True):
383 """Get the values found for key in global table.
385 Callback will be called with a list of values for each peer that
386 returns unique values. The final callback will be an empty list.
389 @param key: the target key to get the values for
390 @type callback: C{method}
391 @param callback: the method to call with the results, it must take 2
392 parameters: the key, and the values found
393 @type searchlocal: C{boolean}
394 @param searchlocal: whether to also look for any local values
396 # Get any local values
398 l = self.store.retrieveValues(key)
400 reactor.callLater(0, callback, key, l)
404 def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
405 """Use the found nodes to send requests for values to."""
406 state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
407 reactor.callLater(0, state.goWithNodes, nodes)
409 # First lookup nodes that have values for the key
410 self.findValue(key, _getValueForKey)
413 def krpc_find_value(self, key, id, _krpc_sender):
414 """Find the number of values stored locally for the key, and the K closest nodes.
417 @param key: the target key to find the values and nodes for
419 @param id: the node ID of the sender node
420 @type _krpc_sender: (C{string}, C{int})
421 @param _krpc_sender: the sender node's IP address and port
423 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
424 self.insertNode(n, contacted = False)
426 nodes = self.table.findNodes(key)
427 nodes = map(lambda node: node.contactInfo(), nodes)
428 num_values = self.store.countValues(key)
429 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
431 def krpc_get_value(self, key, num, id, _krpc_sender):
432 """Retrieve the values stored locally for the key.
435 @param key: the target key to retrieve the values for
437 @param num: the maximum number of values to retrieve, or 0 to
440 @param id: the node ID of the sender node
441 @type _krpc_sender: (C{string}, C{int})
442 @param _krpc_sender: the sender node's IP address and port
444 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
445 self.insertNode(n, contacted = False)
447 l = self.store.retrieveValues(key)
448 if num == 0 or num >= len(l):
449 return {'values' : l, "id": self.node.id}
452 return {'values' : l[:num], "id": self.node.id}
455 class KhashmirWrite(KhashmirRead):
456 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
461 def storeValueForKey(self, key, value, callback=None):
462 """Stores the value for the key in the global table.
464 No status in this implementation, peers respond but don't indicate
465 status of storing values.
468 @param key: the target key to store the value for
469 @type value: C{string}
470 @param value: the value to store with the key
471 @type callback: C{method}
472 @param callback: the method to call with the results, it must take 3
473 parameters: the key, the value stored, and the result of the store
474 (optional, defaults to doing nothing with the results)
476 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
477 """Use the returned K closest nodes to store the key at."""
479 def _storedValueHandler(key, value, sender):
480 """Default callback that does nothing."""
482 response = _storedValueHandler
483 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
484 reactor.callLater(0, action.goWithNodes, nodes)
486 # First find the K closest nodes to operate on.
487 self.findNode(key, _storeValueForKey)
490 def krpc_store_value(self, key, value, token, id, _krpc_sender):
491 """Store the value locally with the key.
494 @param key: the target key to store the value for
495 @type value: C{string}
496 @param value: the value to store with the key
497 @param token: the token to confirm that this peer contacted us previously
499 @param id: the node ID of the sender node
500 @type _krpc_sender: (C{string}, C{int})
501 @param _krpc_sender: the sender node's IP address and port
503 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
504 self.insertNode(n, contacted = False)
505 for secret in self.token_secrets:
506 this_token = sha(secret + _krpc_sender[0]).digest()
507 if token == this_token:
508 self.store.storeValue(key, value)
509 return {"id" : self.node.id}
510 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
513 class Khashmir(KhashmirWrite):
514 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
518 class SimpleTests(unittest.TestCase):
521 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
522 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
523 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
525 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
526 'KEY_EXPIRE': 3600, 'SPEW': False, }
529 d = self.DHT_DEFAULTS.copy()
532 d = self.DHT_DEFAULTS.copy()
539 os.unlink(self.a.store.db)
540 os.unlink(self.b.store.db)
542 def testAddContact(self):
543 self.failUnlessEqual(len(self.a.table.buckets), 1)
544 self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
546 self.failUnlessEqual(len(self.b.table.buckets), 1)
547 self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
549 self.a.addContact('127.0.0.1', 4045)
555 self.failUnlessEqual(len(self.a.table.buckets), 1)
556 self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
557 self.failUnlessEqual(len(self.b.table.buckets), 1)
558 self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
560 def testStoreRetrieve(self):
561 self.a.addContact('127.0.0.1', 4045)
567 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
574 self.a.valueForKey(sha('foo').digest(), self._cb)
583 def _cb(self, key, val):
585 self.failUnlessEqual(self.got, 1)
586 elif 'foobar' in val:
590 class MultiTest(unittest.TestCase):
594 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
595 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
596 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
598 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
599 'KEY_EXPIRE': 3600, 'SPEW': False, }
601 def _done(self, val):
606 self.startport = 4088
607 for i in range(self.num):
608 d = self.DHT_DEFAULTS.copy()
609 d['PORT'] = self.startport + i
610 self.l.append(Khashmir(d))
615 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
616 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
617 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
624 i.findCloseNodes(self._done)
629 i.findCloseNodes(self._done)
636 os.unlink(i.store.db)
640 def testStoreRetrieve(self):
647 def _scb(key, value, result):
649 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
657 self.failUnlessEqual(self.got, 1)
663 self.l[randrange(0, self.num)].valueForKey(K, _rcb)