1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
5 warnings.simplefilter("ignore", DeprecationWarning)
7 from datetime import datetime, timedelta
8 from random import randrange
12 from twisted.internet.defer import Deferred
13 from twisted.internet import protocol, reactor
14 from twisted.trial import unittest
17 from ktable import KTable
18 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
19 from khash import newID, newIDInRange
20 from actions import FindNode, GetValue, KeyExpirer, StoreValue
23 # this is the base class, has base functionality and find node, no key-value mappings
24 class KhashmirBase(protocol.Factory):
26 def __init__(self, config, cache_dir='/tmp'):
28 self.setup(config, cache_dir)
30 def setup(self, config, cache_dir):
32 self.port = config['PORT']
33 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
34 self.node = self._loadSelfNode('', self.port)
35 self.table = KTable(self.node, config)
36 #self.app = service.Application("krpc")
37 self.udp = krpc.hostbroker(self, config)
38 self.udp.protocol = krpc.KRPC
39 self.listenport = reactor.listenUDP(self.port, self.udp)
40 self._loadRoutingTable()
41 self.expirer = KeyExpirer(self.store, config)
42 self.refreshTable(force=1)
43 self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
45 def Node(self, id, host = None, port = None):
46 """Create a new node."""
47 n = self._Node(id, host, port)
49 n.conn = self.udp.connectionForAddr((n.host, n.port))
53 self.listenport.stopListening()
55 def _loadSelfNode(self, host, port):
56 id = self.store.getSelfNode()
59 return self._Node(id, host, port)
61 def checkpoint(self, auto=0):
62 self.store.saveSelfNode(self.node.id)
63 self.store.dumpRoutingTable(self.table.buckets)
66 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
67 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
68 self.checkpoint, (1,))
70 def _loadRoutingTable(self):
72 load routing table nodes from database
73 it's usually a good idea to call refreshTable(force=1) after loading the table
75 nodes = self.store.getRoutingTable()
77 n = self.Node(rec[0], rec[1], int(rec[2]))
78 self.table.insertNode(n, contacted=0)
82 ####### LOCAL INTERFACE - use these methods!
83 def addContact(self, host, port, callback=None):
85 ping this node and add the contact info to the table on pong!
87 n = self.Node(NULL_ID, host, port)
88 self.sendJoin(n, callback=callback)
90 ## this call is async!
91 def findNode(self, id, callback, errback=None):
92 """ returns the contact info for node, or the k closest nodes, from the global table """
93 # get K nodes out of local table/cache, or the node we want
94 nodes = self.table.findNodes(id)
97 d.addCallbacks(callback, errback)
99 d.addCallback(callback)
100 if len(nodes) == 1 and nodes[0].id == id :
103 # create our search state
104 state = FindNode(self, id, d.callback, self.config)
105 reactor.callLater(0, state.goWithNodes, nodes)
107 def insertNode(self, n, contacted=1):
109 insert a node in our local table, pinging oldest contact in bucket, if necessary
111 If all you have is a host/port, then use addContact, which calls this method after
112 receiving the PONG from the remote node. The reason for the seperation is we can't insert
113 a node into the table without it's peer-ID. That means of course the node passed into this
114 method needs to be a properly formed Node object with a valid ID.
116 old = self.table.insertNode(n, contacted=contacted)
117 if (old and old.id != self.node.id and
118 (datetime.now() - old.lastSeen) >
119 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
120 # the bucket is full, check to see if old node is still around and if so, replace it
122 ## these are the callbacks used when we ping the oldest node in a bucket
123 def _staleNodeHandler(oldnode=old, newnode = n):
124 """ called if the pinged node never responds """
125 self.table.replaceStaleNode(old, newnode)
127 def _notStaleNodeHandler(dict, old=old):
128 """ called when we get a pong from the old node """
130 if dict['id'] == old.id:
131 self.table.justSeenNode(old.id)
133 df = old.ping(self.node.id)
134 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
136 def sendJoin(self, node, callback=None):
140 df = node.join(self.node.id)
141 ## these are the callbacks we use when we issue a PING
142 def _pongHandler(dict, node=node, self=self, callback=callback):
143 n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1])
146 callback((dict['rsp']['ip_addr'], dict['rsp']['port']))
147 def _defaultPong(err, node=node, table=self.table, callback=callback):
148 table.nodeFailed(node)
152 df.addCallbacks(_pongHandler,_defaultPong)
154 def findCloseNodes(self, callback=lambda a: None):
156 This does a findNode on the ID one away from our own.
157 This will allow us to populate our table with nodes on our network closest to our own.
158 This is called as soon as we start up with an empty table
160 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
161 self.findNode(id, callback)
163 def refreshTable(self, force=0):
165 force=1 will refresh table regardless of last bucket access time
170 for bucket in self.table.buckets:
171 if force or (datetime.now() - bucket.lastAccessed >
172 timedelta(seconds=self.config['BUCKET_STALENESS'])):
173 id = newIDInRange(bucket.min, bucket.max)
174 self.findNode(id, callback)
178 Returns (num_contacts, num_nodes)
179 num_contacts: number contacts in our routing table
180 num_nodes: number of nodes estimated in the entire dht
182 num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
183 num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
184 return (num_contacts, num_nodes)
187 """Closes the port and cancels pending later calls."""
188 self.listenport.stopListening()
190 self.next_checkpoint.cancel()
193 self.expirer.shutdown()
196 #### Remote Interface - called by remote nodes
197 def krpc_ping(self, id, _krpc_sender):
198 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
199 self.insertNode(n, contacted=0)
200 return {"id" : self.node.id}
202 def krpc_join(self, id, _krpc_sender):
203 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
204 self.insertNode(n, contacted=0)
205 return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id}
207 def krpc_find_node(self, target, id, _krpc_sender):
208 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
209 self.insertNode(n, contacted=0)
210 nodes = self.table.findNodes(target)
211 nodes = map(lambda node: node.senderDict(), nodes)
212 return {"nodes" : nodes, "id" : self.node.id}
215 ## This class provides read-only access to the DHT, valueForKey
216 ## you probably want to use this mixin and provide your own write methods
217 class KhashmirRead(KhashmirBase):
221 def valueForKey(self, key, callback, searchlocal = 1):
222 """ returns the values found for key in global table
223 callback will be called with a list of values for each peer that returns unique values
224 final callback will be an empty list - probably should change to 'more coming' arg
226 nodes = self.table.findNodes(key)
230 l = self.store.retrieveValues(key)
232 reactor.callLater(0, callback, key, l)
236 # create our search state
237 state = GetValue(self, key, callback, self.config)
238 reactor.callLater(0, state.goWithNodes, nodes, l)
240 #### Remote Interface - called by remote nodes
241 def krpc_find_value(self, key, id, _krpc_sender):
242 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
243 self.insertNode(n, contacted=0)
245 l = self.store.retrieveValues(key)
247 return {'values' : l, "id": self.node.id}
249 nodes = self.table.findNodes(key)
250 nodes = map(lambda node: node.senderDict(), nodes)
251 return {'nodes' : nodes, "id": self.node.id}
253 ### provides a generic write method, you probably don't want to deploy something that allows
254 ### arbitrary value storage
255 class KhashmirWrite(KhashmirRead):
257 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
258 def storeValueForKey(self, key, value, callback=None):
259 """ stores the value for key in the global table, returns immediately, no status
260 in this implementation, peers respond but don't indicate status to storing values
261 a key can have many values
263 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
266 def _storedValueHandler(key, value, sender):
268 response=_storedValueHandler
269 action = StoreValue(self.table, key, value, response, self.config)
270 reactor.callLater(0, action.goWithNodes, nodes)
272 # this call is asynch
273 self.findNode(key, _storeValueForKey)
275 #### Remote Interface - called by remote nodes
276 def krpc_store_value(self, key, value, id, _krpc_sender):
277 n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
278 self.insertNode(n, contacted=0)
279 self.store.storeValue(key, value)
280 return {"id" : self.node.id}
282 # the whole shebang, for testing
283 class Khashmir(KhashmirWrite):
286 class SimpleTests(unittest.TestCase):
289 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
290 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
291 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
292 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
293 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
294 'KE_AGE': 3600, 'SPEW': False, }
298 d = self.DHT_DEFAULTS.copy()
301 d = self.DHT_DEFAULTS.copy()
308 os.unlink(self.a.store.db)
309 os.unlink(self.b.store.db)
311 def testAddContact(self):
312 self.failUnlessEqual(len(self.a.table.buckets), 1)
313 self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
315 self.failUnlessEqual(len(self.b.table.buckets), 1)
316 self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
318 self.a.addContact('127.0.0.1', 4045)
324 self.failUnlessEqual(len(self.a.table.buckets), 1)
325 self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
326 self.failUnlessEqual(len(self.b.table.buckets), 1)
327 self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
329 def testStoreRetrieve(self):
330 self.a.addContact('127.0.0.1', 4045)
336 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
343 self.a.valueForKey(sha('foo').digest(), self._cb)
352 def _cb(self, key, val):
354 self.failUnlessEqual(self.got, 1)
355 elif 'foobar' in val:
359 class MultiTest(unittest.TestCase):
363 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
364 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
365 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
366 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
367 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
368 'KE_AGE': 3600, 'SPEW': False, }
370 def _done(self, val):
375 self.startport = 4088
376 for i in range(self.num):
377 d = self.DHT_DEFAULTS.copy()
378 d['PORT'] = self.startport + i
379 self.l.append(Khashmir(d))
384 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
385 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
386 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
393 i.findCloseNodes(self._done)
398 i.findCloseNodes(self._done)
405 os.unlink(i.store.db)
409 def testStoreRetrieve(self):
416 def _scb(key, value, result):
418 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
426 self.failUnlessEqual(self.got, 1)
432 self.l[randrange(0, self.num)].valueForKey(K, _rcb)