1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 """The main Khashmir program."""
7 warnings.simplefilter("ignore", DeprecationWarning)
9 from datetime import datetime, timedelta
10 from random import randrange, shuffle
14 from twisted.internet.defer import Deferred
15 from twisted.internet import protocol, reactor
16 from twisted.trial import unittest
19 from ktable import KTable
20 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
21 from khash import newID, newIDInRange
22 from actions import FindNode, FindValue, GetValue, StoreValue
23 from stats import StatsLogger
26 class KhashmirBase(protocol.Factory):
27 """The base Khashmir class, with base functionality and find node, no key-value mappings.
29 @type _Node: L{node.Node}
30 @ivar _Node: the knode implementation to use for this class of DHT
31 @type config: C{dictionary}
32 @ivar config: the configuration parameters for the DHT
34 @ivar port: the port to listen on
36 @ivar store: the database to store nodes and key/value pairs in
37 @type node: L{node.Node}
39 @type table: L{ktable.KTable}
40 @ivar table: the routing table
41 @type token_secrets: C{list} of C{string}
42 @ivar token_secrets: the current secrets to use to create tokens
43 @type stats: L{stats.StatsLogger}
44 @ivar stats: the statistics gatherer
45 @type udp: L{krpc.hostbroker}
46 @ivar udp: the factory for the KRPC protocol
47 @type listenport: L{twisted.internet.interfaces.IListeningPort}
48 @ivar listenport: the UDP listening port
49 @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall}
50 @ivar next_checkpoint: the delayed call for the next checkpoint
55 def __init__(self, config, cache_dir='/tmp'):
56 """Initialize the Khashmir class and call the L{setup} method.
58 @type config: C{dictionary}
59 @param config: the configuration parameters for the DHT
60 @type cache_dir: C{string}
61 @param cache_dir: the directory to store all files in
62 (optional, defaults to the /tmp directory)
65 self.setup(config, cache_dir)
67 def setup(self, config, cache_dir):
68 """Setup all the Khashmir sub-modules.
70 @type config: C{dictionary}
71 @param config: the configuration parameters for the DHT
72 @type cache_dir: C{string}
73 @param cache_dir: the directory to store all files in
76 self.port = config['PORT']
77 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
78 self.node = self._loadSelfNode('', self.port)
79 self.table = KTable(self.node, config)
80 self.token_secrets = [newID()]
81 self.stats = StatsLogger(self.table, self.store, self.config)
84 self.udp = krpc.hostbroker(self, self.stats, config)
85 self.udp.protocol = krpc.KRPC
86 self.listenport = reactor.listenUDP(self.port, self.udp)
88 # Load the routing table and begin checkpointing
89 self._loadRoutingTable()
90 self.refreshTable(force = True)
91 self.next_checkpoint = reactor.callLater(60, self.checkpoint)
93 def Node(self, id, host = None, port = None):
96 @see: L{node.Node.__init__}
98 n = self._Node(id, host, port)
100 n.conn = self.udp.connectionForAddr((n.host, n.port))
104 """Stop listening for packets."""
105 self.listenport.stopListening()
107 def _loadSelfNode(self, host, port):
108 """Create this node, loading any previously saved one."""
109 id = self.store.getSelfNode()
112 return self._Node(id, host, port)
114 def checkpoint(self):
115 """Perform some periodic maintenance operations."""
116 # Create a new token secret
117 self.token_secrets.insert(0, newID())
118 if len(self.token_secrets) > 3:
119 self.token_secrets.pop()
121 # Save some parameters for reloading
122 self.store.saveSelfNode(self.node.id)
123 self.store.dumpRoutingTable(self.table.buckets)
126 self.store.expireValues(self.config['KEY_EXPIRE'])
129 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
130 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
133 def _loadRoutingTable(self):
134 """Load the previous routing table nodes from the database.
136 It's usually a good idea to call refreshTable(force = True) after
139 nodes = self.store.getRoutingTable()
141 n = self.Node(rec[0], rec[1], int(rec[2]))
142 self.table.insertNode(n, contacted = False)
145 def addContact(self, host, port, callback=None, errback=None):
146 """Ping this node and add the contact info to the table on pong.
148 @type host: C{string}
149 @param host: the IP address of the node to contact
151 @param port:the port of the node to contact
152 @type callback: C{method}
153 @param callback: the method to call with the results, it must take 1
154 parameter, the contact info returned by the node
155 (optional, defaults to doing nothing with the results)
156 @type errback: C{method}
157 @param errback: the method to call if an error occurs
158 (optional, defaults to calling the callback with None)
160 n = self.Node(NULL_ID, host, port)
161 self.sendJoin(n, callback=callback, errback=errback)
163 def findNode(self, id, callback, errback=None):
164 """Find the contact info for the K closest nodes in the global table.
167 @param id: the target ID to find the K closest nodes of
168 @type callback: C{method}
169 @param callback: the method to call with the results, it must take 1
170 parameter, the list of K closest nodes
171 @type errback: C{method}
172 @param errback: the method to call if an error occurs
173 (optional, defaults to doing nothing when an error occurs)
175 # Get K nodes out of local table/cache
176 nodes = self.table.findNodes(id)
179 d.addCallbacks(callback, errback)
181 d.addCallback(callback)
183 # If the target ID was found
184 if len(nodes) == 1 and nodes[0].id == id:
187 # Start the finding nodes action
188 state = FindNode(self, id, d.callback, self.config, self.stats)
189 reactor.callLater(0, state.goWithNodes, nodes)
191 def insertNode(self, node, contacted = True):
192 """Try to insert a node in our local table, pinging oldest contact if necessary.
194 If all you have is a host/port, then use L{addContact}, which calls this
195 method after receiving the PONG from the remote node. The reason for
196 the seperation is we can't insert a node into the table without its
197 node ID. That means of course the node passed into this method needs
198 to be a properly formed Node object with a valid ID.
200 @type node: L{node.Node}
201 @param node: the new node to try and insert
202 @type contacted: C{boolean}
203 @param contacted: whether the new node is known to be good, i.e.
204 responded to a request (optional, defaults to True)
206 old = self.table.insertNode(node, contacted=contacted)
207 if (old and old.id != self.node.id and
208 (datetime.now() - old.lastSeen) >
209 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
211 def _staleNodeHandler(err, oldnode = old, newnode = node, self = self):
212 """The pinged node never responded, so replace it."""
213 log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port))
215 self.table.replaceStaleNode(oldnode, newnode)
217 def _notStaleNodeHandler(dict, old=old, self=self):
218 """Got a pong from the old node, so update it."""
220 if dict['id'] == old.id:
221 self.table.justSeenNode(old.id)
223 # Bucket is full, check to see if old node is still available
224 self.stats.startedAction('ping')
225 df = old.ping(self.node.id)
226 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
228 def sendJoin(self, node, callback=None, errback=None):
229 """Join the DHT by pinging a bootstrap node.
231 @type node: L{node.Node}
232 @param node: the node to send the join to
233 @type callback: C{method}
234 @param callback: the method to call with the results, it must take 1
235 parameter, the contact info returned by the node
236 (optional, defaults to doing nothing with the results)
237 @type errback: C{method}
238 @param errback: the method to call if an error occurs
239 (optional, defaults to calling the callback with None)
242 def _pongHandler(dict, node=node, self=self, callback=callback):
243 """Node responded properly, callback with response."""
244 n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
247 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
249 def _defaultPong(err, node=node, self=self, callback=callback, errback=errback):
250 """Error occurred, fail node and errback or callback with error."""
251 log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
253 self.table.nodeFailed(node)
259 self.stats.startedAction('join')
260 df = node.join(self.node.id)
261 df.addCallbacks(_pongHandler, _defaultPong)
263 def findCloseNodes(self, callback=lambda a: None, errback = None):
264 """Perform a findNode on the ID one away from our own.
266 This will allow us to populate our table with nodes on our network
267 closest to our own. This is called as soon as we start up with an
270 @type callback: C{method}
271 @param callback: the method to call with the results, it must take 1
272 parameter, the list of K closest nodes
273 (optional, defaults to doing nothing with the results)
274 @type errback: C{method}
275 @param errback: the method to call if an error occurs
276 (optional, defaults to doing nothing when an error occurs)
278 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
279 self.findNode(id, callback, errback)
281 def refreshTable(self, force = False):
282 """Check all the buckets for those that need refreshing.
284 @param force: refresh all buckets regardless of last bucket access time
285 (optional, defaults to False)
290 for bucket in self.table.buckets:
291 if force or (datetime.now() - bucket.lastAccessed >
292 timedelta(seconds=self.config['BUCKET_STALENESS'])):
293 # Choose a random ID in the bucket and try and find it
294 id = newIDInRange(bucket.min, bucket.max)
295 self.findNode(id, callback)
298 """Closes the port and cancels pending later calls."""
299 self.listenport.stopListening()
301 self.next_checkpoint.cancel()
307 """Gather the statistics for the DHT."""
308 return self.stats.formatHTML()
311 def krpc_ping(self, id, _krpc_sender):
315 @param id: the node ID of the sender node
316 @type _krpc_sender: (C{string}, C{int})
317 @param _krpc_sender: the sender node's IP address and port
319 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
320 self.insertNode(n, contacted = False)
322 return {"id" : self.node.id}
324 def krpc_join(self, id, _krpc_sender):
325 """Add the node by responding with its address and port.
328 @param id: the node ID of the sender node
329 @type _krpc_sender: (C{string}, C{int})
330 @param _krpc_sender: the sender node's IP address and port
332 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
333 self.insertNode(n, contacted = False)
335 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
337 def krpc_find_node(self, target, id, _krpc_sender):
338 """Find the K closest nodes to the target in the local routing table.
340 @type target: C{string}
341 @param target: the target ID to find nodes for
343 @param id: the node ID of the sender node
344 @type _krpc_sender: (C{string}, C{int})
345 @param _krpc_sender: the sender node's IP address and port
347 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
348 self.insertNode(n, contacted = False)
350 nodes = self.table.findNodes(target)
351 nodes = map(lambda node: node.contactInfo(), nodes)
352 token = sha(self.token_secrets[0] + _krpc_sender[0]).digest()
353 return {"nodes" : nodes, "token" : token, "id" : self.node.id}
356 class KhashmirRead(KhashmirBase):
357 """The read-only Khashmir class, which can only retrieve (not store) key/value mappings."""
362 def findValue(self, key, callback, errback=None):
363 """Get the nodes that have values for the key from the global table.
366 @param key: the target key to find the values for
367 @type callback: C{method}
368 @param callback: the method to call with the results, it must take 1
369 parameter, the list of nodes with values
370 @type errback: C{method}
371 @param errback: the method to call if an error occurs
372 (optional, defaults to doing nothing when an error occurs)
374 # Get K nodes out of local table/cache
375 nodes = self.table.findNodes(key)
378 d.addCallbacks(callback, errback)
380 d.addCallback(callback)
382 # Search for others starting with the locally found ones
383 state = FindValue(self, key, d.callback, self.config, self.stats)
384 reactor.callLater(0, state.goWithNodes, nodes)
386 def valueForKey(self, key, callback, searchlocal = True):
387 """Get the values found for key in global table.
389 Callback will be called with a list of values for each peer that
390 returns unique values. The final callback will be an empty list.
393 @param key: the target key to get the values for
394 @type callback: C{method}
395 @param callback: the method to call with the results, it must take 2
396 parameters: the key, and the values found
397 @type searchlocal: C{boolean}
398 @param searchlocal: whether to also look for any local values
400 # Get any local values
402 l = self.store.retrieveValues(key)
404 reactor.callLater(0, callback, key, l)
408 def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
409 """Use the found nodes to send requests for values to."""
410 state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
411 reactor.callLater(0, state.goWithNodes, nodes)
413 # First lookup nodes that have values for the key
414 self.findValue(key, _getValueForKey)
417 def krpc_find_value(self, key, id, _krpc_sender):
418 """Find the number of values stored locally for the key, and the K closest nodes.
421 @param key: the target key to find the values and nodes for
423 @param id: the node ID of the sender node
424 @type _krpc_sender: (C{string}, C{int})
425 @param _krpc_sender: the sender node's IP address and port
427 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
428 self.insertNode(n, contacted = False)
430 nodes = self.table.findNodes(key)
431 nodes = map(lambda node: node.contactInfo(), nodes)
432 num_values = self.store.countValues(key)
433 return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
435 def krpc_get_value(self, key, num, id, _krpc_sender):
436 """Retrieve the values stored locally for the key.
439 @param key: the target key to retrieve the values for
441 @param num: the maximum number of values to retrieve, or 0 to
444 @param id: the node ID of the sender node
445 @type _krpc_sender: (C{string}, C{int})
446 @param _krpc_sender: the sender node's IP address and port
448 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
449 self.insertNode(n, contacted = False)
451 l = self.store.retrieveValues(key)
452 if num == 0 or num >= len(l):
453 return {'values' : l, "id": self.node.id}
456 return {'values' : l[:num], "id": self.node.id}
459 class KhashmirWrite(KhashmirRead):
460 """The read-write Khashmir class, which can store and retrieve key/value mappings."""
465 def storeValueForKey(self, key, value, callback=None):
466 """Stores the value for the key in the global table.
468 No status in this implementation, peers respond but don't indicate
469 status of storing values.
472 @param key: the target key to store the value for
473 @type value: C{string}
474 @param value: the value to store with the key
475 @type callback: C{method}
476 @param callback: the method to call with the results, it must take 3
477 parameters: the key, the value stored, and the result of the store
478 (optional, defaults to doing nothing with the results)
480 def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
481 """Use the returned K closest nodes to store the key at."""
483 def _storedValueHandler(key, value, sender):
484 """Default callback that does nothing."""
486 response = _storedValueHandler
487 action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
488 reactor.callLater(0, action.goWithNodes, nodes)
490 # First find the K closest nodes to operate on.
491 self.findNode(key, _storeValueForKey)
494 def krpc_store_value(self, key, value, token, id, _krpc_sender):
495 """Store the value locally with the key.
498 @param key: the target key to store the value for
499 @type value: C{string}
500 @param value: the value to store with the key
501 @param token: the token to confirm that this peer contacted us previously
503 @param id: the node ID of the sender node
504 @type _krpc_sender: (C{string}, C{int})
505 @param _krpc_sender: the sender node's IP address and port
507 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
508 self.insertNode(n, contacted = False)
509 for secret in self.token_secrets:
510 this_token = sha(secret + _krpc_sender[0]).digest()
511 if token == this_token:
512 self.store.storeValue(key, value)
513 return {"id" : self.node.id}
514 raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one')
517 class Khashmir(KhashmirWrite):
518 """The default Khashmir class (currently the read-write L{KhashmirWrite})."""
522 class SimpleTests(unittest.TestCase):
525 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
526 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
527 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
529 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
530 'KEY_EXPIRE': 3600, 'SPEW': False, }
533 d = self.DHT_DEFAULTS.copy()
536 d = self.DHT_DEFAULTS.copy()
543 os.unlink(self.a.store.db)
544 os.unlink(self.b.store.db)
546 def testAddContact(self):
547 self.failUnlessEqual(len(self.a.table.buckets), 1)
548 self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
550 self.failUnlessEqual(len(self.b.table.buckets), 1)
551 self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
553 self.a.addContact('127.0.0.1', 4045)
559 self.failUnlessEqual(len(self.a.table.buckets), 1)
560 self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
561 self.failUnlessEqual(len(self.b.table.buckets), 1)
562 self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
564 def testStoreRetrieve(self):
565 self.a.addContact('127.0.0.1', 4045)
571 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
578 self.a.valueForKey(sha('foo').digest(), self._cb)
587 def _cb(self, key, val):
589 self.failUnlessEqual(self.got, 1)
590 elif 'foobar' in val:
594 class MultiTest(unittest.TestCase):
598 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
599 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4,
600 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000,
602 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
603 'KEY_EXPIRE': 3600, 'SPEW': False, }
605 def _done(self, val):
610 self.startport = 4088
611 for i in range(self.num):
612 d = self.DHT_DEFAULTS.copy()
613 d['PORT'] = self.startport + i
614 self.l.append(Khashmir(d))
619 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
620 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
621 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
628 i.findCloseNodes(self._done)
633 i.findCloseNodes(self._done)
640 os.unlink(i.store.db)
644 def testStoreRetrieve(self):
651 def _scb(key, value, result):
653 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
661 self.failUnlessEqual(self.got, 1)
667 self.l[randrange(0, self.num)].valueForKey(K, _rcb)