1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
5 warnings.simplefilter("ignore", DeprecationWarning)
8 from random import randrange
12 from twisted.internet.defer import Deferred
13 from twisted.internet import protocol, reactor
14 from twisted.trial import unittest
17 from ktable import KTable
18 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
19 from khash import newID, newIDInRange
20 from actions import FindNode, GetValue, KeyExpirer, StoreValue
23 # this is the base class, has base functionality and find node, no key-value mappings
24 class KhashmirBase(protocol.Factory):
26 def __init__(self, config, cache_dir='/tmp'):
28 self.setup(config, cache_dir)
30 def setup(self, config, cache_dir):
32 self.port = config['PORT']
33 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
34 self.node = self._loadSelfNode('', self.port)
35 self.table = KTable(self.node, config)
36 #self.app = service.Application("krpc")
37 self.udp = krpc.hostbroker(self)
38 self.udp.protocol = krpc.KRPC
39 self.listenport = reactor.listenUDP(self.port, self.udp)
41 self._loadRoutingTable()
42 self.expirer = KeyExpirer(self.store, config)
43 self.refreshTable(force=1)
44 self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
52 self.listenport.stopListening()
54 def _loadSelfNode(self, host, port):
55 id = self.store.getSelfNode()
58 return self._Node().init(id, host, port)
60 def checkpoint(self, auto=0):
61 self.store.saveSelfNode(self.node.id)
62 self.store.dumpRoutingTable(self.table.buckets)
65 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
66 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
67 self.checkpoint, (1,))
69 def _loadRoutingTable(self):
71 load routing table nodes from database
72 it's usually a good idea to call refreshTable(force=1) after loading the table
74 nodes = self.store.getRoutingTable()
76 n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
77 n.conn = self.udp.connectionForAddr((n.host, n.port))
78 self.table.insertNode(n, contacted=0)
82 ####### LOCAL INTERFACE - use these methods!
83 def addContact(self, host, port, callback=None):
85 ping this node and add the contact info to the table on pong!
87 n =self.Node().init(NULL_ID, host, port)
88 n.conn = self.udp.connectionForAddr((n.host, n.port))
89 self.sendPing(n, callback=callback)
91 ## this call is async!
92 def findNode(self, id, callback, errback=None):
93 """ returns the contact info for node, or the k closest nodes, from the global table """
94 # get K nodes out of local table/cache, or the node we want
95 nodes = self.table.findNodes(id)
98 d.addCallbacks(callback, errback)
100 d.addCallback(callback)
101 if len(nodes) == 1 and nodes[0].id == id :
104 # create our search state
105 state = FindNode(self, id, d.callback, self.config)
106 reactor.callLater(0, state.goWithNodes, nodes)
108 def insertNode(self, n, contacted=1):
110 insert a node in our local table, pinging oldest contact in bucket, if necessary
112 If all you have is a host/port, then use addContact, which calls this method after
113 receiving the PONG from the remote node. The reason for the seperation is we can't insert
114 a node into the table without it's peer-ID. That means of course the node passed into this
115 method needs to be a properly formed Node object with a valid ID.
117 old = self.table.insertNode(n, contacted=contacted)
118 if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id:
119 # the bucket is full, check to see if old node is still around and if so, replace it
121 ## these are the callbacks used when we ping the oldest node in a bucket
122 def _staleNodeHandler(oldnode=old, newnode = n):
123 """ called if the pinged node never responds """
124 self.table.replaceStaleNode(old, newnode)
126 def _notStaleNodeHandler(dict, old=old):
127 """ called when we get a pong from the old node """
129 if dict['id'] == old.id:
130 self.table.justSeenNode(old.id)
132 df = old.ping(self.node.id)
133 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
135 def sendPing(self, node, callback=None):
139 df = node.ping(self.node.id)
140 ## these are the callbacks we use when we issue a PING
141 def _pongHandler(dict, node=node, table=self.table, callback=callback):
142 _krpc_sender = dict['_krpc_sender']
144 sender = {'id' : dict['id']}
145 sender['host'] = _krpc_sender[0]
146 sender['port'] = _krpc_sender[1]
147 n = self.Node().initWithDict(sender)
148 n.conn = self.udp.connectionForAddr((n.host, n.port))
152 def _defaultPong(err, node=node, table=self.table, callback=callback):
153 table.nodeFailed(node)
157 df.addCallbacks(_pongHandler,_defaultPong)
159 def findCloseNodes(self, callback=lambda a: None):
161 This does a findNode on the ID one away from our own.
162 This will allow us to populate our table with nodes on our network closest to our own.
163 This is called as soon as we start up with an empty table
165 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
166 self.findNode(id, callback)
168 def refreshTable(self, force=0):
170 force=1 will refresh table regardless of last bucket access time
175 for bucket in self.table.buckets:
176 if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']):
177 id = newIDInRange(bucket.min, bucket.max)
178 self.findNode(id, callback)
182 Returns (num_contacts, num_nodes)
183 num_contacts: number contacts in our routing table
184 num_nodes: number of nodes estimated in the entire dht
186 num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
187 num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
188 return (num_contacts, num_nodes)
191 """Closes the port and cancels pending later calls."""
192 self.listenport.stopListening()
194 self.next_checkpoint.cancel()
197 self.expirer.shutdown()
200 def krpc_ping(self, id, _krpc_sender):
202 sender['host'] = _krpc_sender[0]
203 sender['port'] = _krpc_sender[1]
204 n = self.Node().initWithDict(sender)
205 n.conn = self.udp.connectionForAddr((n.host, n.port))
206 self.insertNode(n, contacted=0)
207 return {"id" : self.node.id}
209 def krpc_find_node(self, target, id, _krpc_sender):
210 nodes = self.table.findNodes(target)
211 nodes = map(lambda node: node.senderDict(), nodes)
213 sender['host'] = _krpc_sender[0]
214 sender['port'] = _krpc_sender[1]
215 n = self.Node().initWithDict(sender)
216 n.conn = self.udp.connectionForAddr((n.host, n.port))
217 self.insertNode(n, contacted=0)
218 return {"nodes" : nodes, "id" : self.node.id}
221 ## This class provides read-only access to the DHT, valueForKey
222 ## you probably want to use this mixin and provide your own write methods
223 class KhashmirRead(KhashmirBase):
227 def valueForKey(self, key, callback, searchlocal = 1):
228 """ returns the values found for key in global table
229 callback will be called with a list of values for each peer that returns unique values
230 final callback will be an empty list - probably should change to 'more coming' arg
232 nodes = self.table.findNodes(key)
236 l = self.store.retrieveValues(key)
238 reactor.callLater(0, callback, key, l)
242 # create our search state
243 state = GetValue(self, key, callback, self.config)
244 reactor.callLater(0, state.goWithNodes, nodes, l)
246 def krpc_find_value(self, key, id, _krpc_sender):
248 sender['host'] = _krpc_sender[0]
249 sender['port'] = _krpc_sender[1]
250 n = self.Node().initWithDict(sender)
251 n.conn = self.udp.connectionForAddr((n.host, n.port))
252 self.insertNode(n, contacted=0)
254 l = self.store.retrieveValues(key)
256 return {'values' : l, "id": self.node.id}
258 nodes = self.table.findNodes(key)
259 nodes = map(lambda node: node.senderDict(), nodes)
260 return {'nodes' : nodes, "id": self.node.id}
262 ### provides a generic write method, you probably don't want to deploy something that allows
263 ### arbitrary value storage
264 class KhashmirWrite(KhashmirRead):
266 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
267 def storeValueForKey(self, key, value, callback=None):
268 """ stores the value for key in the global table, returns immediately, no status
269 in this implementation, peers respond but don't indicate status to storing values
270 a key can have many values
272 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
275 def _storedValueHandler(key, value, sender):
277 response=_storedValueHandler
278 action = StoreValue(self.table, key, value, response, self.config)
279 reactor.callLater(0, action.goWithNodes, nodes)
281 # this call is asynch
282 self.findNode(key, _storeValueForKey)
284 def krpc_store_value(self, key, value, id, _krpc_sender):
285 self.store.storeValue(key, value)
287 sender['host'] = _krpc_sender[0]
288 sender['port'] = _krpc_sender[1]
289 n = self.Node().initWithDict(sender)
290 n.conn = self.udp.connectionForAddr((n.host, n.port))
291 self.insertNode(n, contacted=0)
292 return {"id" : self.node.id}
294 # the whole shebang, for testing
295 class Khashmir(KhashmirWrite):
298 class SimpleTests(unittest.TestCase):
301 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
302 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
303 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
304 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
305 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
310 d = self.DHT_DEFAULTS.copy()
313 d = self.DHT_DEFAULTS.copy()
320 os.unlink(self.a.store.db)
321 os.unlink(self.b.store.db)
323 def testAddContact(self):
324 self.assertEqual(len(self.a.table.buckets), 1)
325 self.assertEqual(len(self.a.table.buckets[0].l), 0)
327 self.assertEqual(len(self.b.table.buckets), 1)
328 self.assertEqual(len(self.b.table.buckets[0].l), 0)
330 self.a.addContact('127.0.0.1', 4045)
336 self.assertEqual(len(self.a.table.buckets), 1)
337 self.assertEqual(len(self.a.table.buckets[0].l), 1)
338 self.assertEqual(len(self.b.table.buckets), 1)
339 self.assertEqual(len(self.b.table.buckets[0].l), 1)
341 def testStoreRetrieve(self):
342 self.a.addContact('127.0.0.1', 4045)
348 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
355 self.a.valueForKey(sha('foo').digest(), self._cb)
364 def _cb(self, key, val):
366 self.assertEqual(self.got, 1)
367 elif 'foobar' in val:
371 class MultiTest(unittest.TestCase):
375 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
376 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
377 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
378 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
379 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
382 def _done(self, val):
387 self.startport = 4088
388 for i in range(self.num):
389 d = self.DHT_DEFAULTS.copy()
390 d['PORT'] = self.startport + i
391 self.l.append(Khashmir(d))
396 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
397 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
398 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
405 i.findCloseNodes(self._done)
410 i.findCloseNodes(self._done)
417 os.unlink(i.store.db)
421 def testStoreRetrieve(self):
428 def _scb(key, value, result):
430 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
438 self.assertEqual(self.got, 1)
444 self.l[randrange(0, self.num)].valueForKey(K, _rcb)