1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
5 warnings.simplefilter("ignore", DeprecationWarning)
7 from datetime import datetime, timedelta
8 from random import randrange
12 from twisted.internet.defer import Deferred
13 from twisted.internet import protocol, reactor
14 from twisted.trial import unittest
17 from ktable import KTable
18 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
19 from khash import newID, newIDInRange
20 from actions import FindNode, GetValue, KeyExpirer, StoreValue
23 # this is the base class, has base functionality and find node, no key-value mappings
24 class KhashmirBase(protocol.Factory):
26 def __init__(self, config, cache_dir='/tmp'):
28 self.setup(config, cache_dir)
30 def setup(self, config, cache_dir):
32 self.port = config['PORT']
33 self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
34 self.node = self._loadSelfNode('', self.port)
35 self.table = KTable(self.node, config)
36 #self.app = service.Application("krpc")
37 self.udp = krpc.hostbroker(self)
38 self.udp.protocol = krpc.KRPC
39 self.listenport = reactor.listenUDP(self.port, self.udp)
40 self._loadRoutingTable()
41 self.expirer = KeyExpirer(self.store, config)
42 self.refreshTable(force=1)
43 self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
51 self.listenport.stopListening()
53 def _loadSelfNode(self, host, port):
54 id = self.store.getSelfNode()
57 return self._Node().init(id, host, port)
59 def checkpoint(self, auto=0):
60 self.store.saveSelfNode(self.node.id)
61 self.store.dumpRoutingTable(self.table.buckets)
64 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
65 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
66 self.checkpoint, (1,))
68 def _loadRoutingTable(self):
70 load routing table nodes from database
71 it's usually a good idea to call refreshTable(force=1) after loading the table
73 nodes = self.store.getRoutingTable()
75 n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
76 n.conn = self.udp.connectionForAddr((n.host, n.port))
77 self.table.insertNode(n, contacted=0)
81 ####### LOCAL INTERFACE - use these methods!
82 def addContact(self, host, port, callback=None):
84 ping this node and add the contact info to the table on pong!
86 n =self.Node().init(NULL_ID, host, port)
87 n.conn = self.udp.connectionForAddr((n.host, n.port))
88 self.sendPing(n, callback=callback)
90 ## this call is async!
91 def findNode(self, id, callback, errback=None):
92 """ returns the contact info for node, or the k closest nodes, from the global table """
93 # get K nodes out of local table/cache, or the node we want
94 nodes = self.table.findNodes(id)
97 d.addCallbacks(callback, errback)
99 d.addCallback(callback)
100 if len(nodes) == 1 and nodes[0].id == id :
103 # create our search state
104 state = FindNode(self, id, d.callback, self.config)
105 reactor.callLater(0, state.goWithNodes, nodes)
107 def insertNode(self, n, contacted=1):
109 insert a node in our local table, pinging oldest contact in bucket, if necessary
111 If all you have is a host/port, then use addContact, which calls this method after
112 receiving the PONG from the remote node. The reason for the seperation is we can't insert
113 a node into the table without it's peer-ID. That means of course the node passed into this
114 method needs to be a properly formed Node object with a valid ID.
116 old = self.table.insertNode(n, contacted=contacted)
117 if (old and old.id != self.node.id and
118 (datetime.now() - old.lastSeen) >
119 timedelta(seconds=self.config['MIN_PING_INTERVAL'])):
120 # the bucket is full, check to see if old node is still around and if so, replace it
122 ## these are the callbacks used when we ping the oldest node in a bucket
123 def _staleNodeHandler(oldnode=old, newnode = n):
124 """ called if the pinged node never responds """
125 self.table.replaceStaleNode(old, newnode)
127 def _notStaleNodeHandler(dict, old=old):
128 """ called when we get a pong from the old node """
130 if dict['id'] == old.id:
131 self.table.justSeenNode(old.id)
133 df = old.ping(self.node.id)
134 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
136 def sendPing(self, node, callback=None):
140 df = node.ping(self.node.id)
141 ## these are the callbacks we use when we issue a PING
142 def _pongHandler(dict, node=node, table=self.table, callback=callback):
143 _krpc_sender = dict['_krpc_sender']
145 sender = {'id' : dict['id']}
146 sender['host'] = _krpc_sender[0]
147 sender['port'] = _krpc_sender[1]
148 n = self.Node().initWithDict(sender)
149 n.conn = self.udp.connectionForAddr((n.host, n.port))
153 def _defaultPong(err, node=node, table=self.table, callback=callback):
154 table.nodeFailed(node)
158 df.addCallbacks(_pongHandler,_defaultPong)
160 def findCloseNodes(self, callback=lambda a: None):
162 This does a findNode on the ID one away from our own.
163 This will allow us to populate our table with nodes on our network closest to our own.
164 This is called as soon as we start up with an empty table
166 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
167 self.findNode(id, callback)
169 def refreshTable(self, force=0):
171 force=1 will refresh table regardless of last bucket access time
176 for bucket in self.table.buckets:
177 if force or (datetime.now() - bucket.lastAccessed >
178 timedelta(seconds=self.config['BUCKET_STALENESS'])):
179 id = newIDInRange(bucket.min, bucket.max)
180 self.findNode(id, callback)
184 Returns (num_contacts, num_nodes)
185 num_contacts: number contacts in our routing table
186 num_nodes: number of nodes estimated in the entire dht
188 num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
189 num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
190 return (num_contacts, num_nodes)
193 """Closes the port and cancels pending later calls."""
194 self.listenport.stopListening()
196 self.next_checkpoint.cancel()
199 self.expirer.shutdown()
202 def krpc_ping(self, id, _krpc_sender):
204 sender['host'] = _krpc_sender[0]
205 sender['port'] = _krpc_sender[1]
206 n = self.Node().initWithDict(sender)
207 n.conn = self.udp.connectionForAddr((n.host, n.port))
208 self.insertNode(n, contacted=0)
209 return {"id" : self.node.id}
211 def krpc_find_node(self, target, id, _krpc_sender):
212 nodes = self.table.findNodes(target)
213 nodes = map(lambda node: node.senderDict(), nodes)
215 sender['host'] = _krpc_sender[0]
216 sender['port'] = _krpc_sender[1]
217 n = self.Node().initWithDict(sender)
218 n.conn = self.udp.connectionForAddr((n.host, n.port))
219 self.insertNode(n, contacted=0)
220 return {"nodes" : nodes, "id" : self.node.id}
223 ## This class provides read-only access to the DHT, valueForKey
224 ## you probably want to use this mixin and provide your own write methods
225 class KhashmirRead(KhashmirBase):
229 def valueForKey(self, key, callback, searchlocal = 1):
230 """ returns the values found for key in global table
231 callback will be called with a list of values for each peer that returns unique values
232 final callback will be an empty list - probably should change to 'more coming' arg
234 nodes = self.table.findNodes(key)
238 l = self.store.retrieveValues(key)
240 reactor.callLater(0, callback, key, l)
244 # create our search state
245 state = GetValue(self, key, callback, self.config)
246 reactor.callLater(0, state.goWithNodes, nodes, l)
248 def krpc_find_value(self, key, id, _krpc_sender):
250 sender['host'] = _krpc_sender[0]
251 sender['port'] = _krpc_sender[1]
252 n = self.Node().initWithDict(sender)
253 n.conn = self.udp.connectionForAddr((n.host, n.port))
254 self.insertNode(n, contacted=0)
256 l = self.store.retrieveValues(key)
258 return {'values' : l, "id": self.node.id}
260 nodes = self.table.findNodes(key)
261 nodes = map(lambda node: node.senderDict(), nodes)
262 return {'nodes' : nodes, "id": self.node.id}
264 ### provides a generic write method, you probably don't want to deploy something that allows
265 ### arbitrary value storage
266 class KhashmirWrite(KhashmirRead):
268 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
269 def storeValueForKey(self, key, value, callback=None):
270 """ stores the value for key in the global table, returns immediately, no status
271 in this implementation, peers respond but don't indicate status to storing values
272 a key can have many values
274 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
277 def _storedValueHandler(key, value, sender):
279 response=_storedValueHandler
280 action = StoreValue(self.table, key, value, response, self.config)
281 reactor.callLater(0, action.goWithNodes, nodes)
283 # this call is asynch
284 self.findNode(key, _storeValueForKey)
286 def krpc_store_value(self, key, value, id, _krpc_sender):
287 self.store.storeValue(key, value)
289 sender['host'] = _krpc_sender[0]
290 sender['port'] = _krpc_sender[1]
291 n = self.Node().initWithDict(sender)
292 n.conn = self.udp.connectionForAddr((n.host, n.port))
293 self.insertNode(n, contacted=0)
294 return {"id" : self.node.id}
296 # the whole shebang, for testing
297 class Khashmir(KhashmirWrite):
300 class SimpleTests(unittest.TestCase):
303 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
304 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
305 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
306 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
307 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
312 d = self.DHT_DEFAULTS.copy()
315 d = self.DHT_DEFAULTS.copy()
322 os.unlink(self.a.store.db)
323 os.unlink(self.b.store.db)
325 def testAddContact(self):
326 self.assertEqual(len(self.a.table.buckets), 1)
327 self.assertEqual(len(self.a.table.buckets[0].l), 0)
329 self.assertEqual(len(self.b.table.buckets), 1)
330 self.assertEqual(len(self.b.table.buckets[0].l), 0)
332 self.a.addContact('127.0.0.1', 4045)
338 self.assertEqual(len(self.a.table.buckets), 1)
339 self.assertEqual(len(self.a.table.buckets[0].l), 1)
340 self.assertEqual(len(self.b.table.buckets), 1)
341 self.assertEqual(len(self.b.table.buckets[0].l), 1)
343 def testStoreRetrieve(self):
344 self.a.addContact('127.0.0.1', 4045)
350 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
357 self.a.valueForKey(sha('foo').digest(), self._cb)
366 def _cb(self, key, val):
368 self.assertEqual(self.got, 1)
369 elif 'foobar' in val:
373 class MultiTest(unittest.TestCase):
377 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
378 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
379 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
380 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
381 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
384 def _done(self, val):
389 self.startport = 4088
390 for i in range(self.num):
391 d = self.DHT_DEFAULTS.copy()
392 d['PORT'] = self.startport + i
393 self.l.append(Khashmir(d))
398 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
399 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
400 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
407 i.findCloseNodes(self._done)
412 i.findCloseNodes(self._done)
419 os.unlink(i.store.db)
423 def testStoreRetrieve(self):
430 def _scb(key, value, result):
432 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
440 self.assertEqual(self.got, 1)
446 self.l[randrange(0, self.num)].valueForKey(K, _rcb)