1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
5 warnings.simplefilter("ignore", DeprecationWarning)
8 from random import randrange
11 import sqlite ## find this at http://pysqlite.sourceforge.net/
13 from twisted.internet.defer import Deferred
14 from twisted.internet import protocol, reactor
15 from twisted.trial import unittest
17 from ktable import KTable
18 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
19 from khash import newID, newIDInRange
20 from actions import FindNode, GetValue, KeyExpirer, StoreValue
23 class KhashmirDBExcept(Exception):
26 # this is the base class, has base functionality and find node, no key-value mappings
27 class KhashmirBase(protocol.Factory):
29 def __init__(self, config, cache_dir='/tmp'):
31 self.setup(config, cache_dir)
33 def setup(self, config, cache_dir):
35 self.port = config['PORT']
36 self._findDB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
37 self.node = self._loadSelfNode('', self.port)
38 self.table = KTable(self.node, config)
39 #self.app = service.Application("krpc")
40 self.udp = krpc.hostbroker(self)
41 self.udp.protocol = krpc.KRPC
42 self.listenport = reactor.listenUDP(self.port, self.udp)
44 self._loadRoutingTable()
45 self.expirer = KeyExpirer(self.store, config)
46 self.refreshTable(force=1)
47 self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
55 self.listenport.stopListening()
57 def _loadSelfNode(self, host, port):
58 c = self.store.cursor()
59 c.execute('select id from self where num = 0;')
64 return self._Node().init(id, host, port)
66 def _saveSelfNode(self):
67 c = self.store.cursor()
68 c.execute('delete from self where num = 0;')
69 c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
72 def checkpoint(self, auto=0):
74 self._dumpRoutingTable()
77 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
78 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
79 self.checkpoint, (1,))
81 def _findDB(self, db):
90 def _loadDB(self, db):
92 self.store = sqlite.connect(db=db)
93 #self.store.autocommit = 0
96 raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
98 def _createNewDB(self, db):
99 self.store = sqlite.connect(db=db)
101 create table kv (key binary, value binary, time timestamp, primary key (key, value));
102 create index kv_key on kv(key);
103 create index kv_timestamp on kv(time);
105 create table nodes (id binary primary key, host text, port number);
107 create table self (num number primary key, id binary);
109 c = self.store.cursor()
113 def _dumpRoutingTable(self):
115 save routing table nodes to the database
117 c = self.store.cursor()
118 c.execute("delete from nodes where id not NULL;")
119 for bucket in self.table.buckets:
120 for node in bucket.l:
121 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
124 def _loadRoutingTable(self):
126 load routing table nodes from database
127 it's usually a good idea to call refreshTable(force=1) after loading the table
129 c = self.store.cursor()
130 c.execute("select * from nodes;")
131 for rec in c.fetchall():
132 n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
133 n.conn = self.udp.connectionForAddr((n.host, n.port))
134 self.table.insertNode(n, contacted=0)
138 ####### LOCAL INTERFACE - use these methods!
139 def addContact(self, host, port, callback=None):
141 ping this node and add the contact info to the table on pong!
143 n =self.Node().init(NULL_ID, host, port)
144 n.conn = self.udp.connectionForAddr((n.host, n.port))
145 self.sendPing(n, callback=callback)
147 ## this call is async!
148 def findNode(self, id, callback, errback=None):
149 """ returns the contact info for node, or the k closest nodes, from the global table """
150 # get K nodes out of local table/cache, or the node we want
151 nodes = self.table.findNodes(id)
154 d.addCallbacks(callback, errback)
156 d.addCallback(callback)
157 if len(nodes) == 1 and nodes[0].id == id :
160 # create our search state
161 state = FindNode(self, id, d.callback, self.config)
162 reactor.callLater(0, state.goWithNodes, nodes)
164 def insertNode(self, n, contacted=1):
166 insert a node in our local table, pinging oldest contact in bucket, if necessary
168 If all you have is a host/port, then use addContact, which calls this method after
169 receiving the PONG from the remote node. The reason for the seperation is we can't insert
170 a node into the table without it's peer-ID. That means of course the node passed into this
171 method needs to be a properly formed Node object with a valid ID.
173 old = self.table.insertNode(n, contacted=contacted)
174 if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id:
175 # the bucket is full, check to see if old node is still around and if so, replace it
177 ## these are the callbacks used when we ping the oldest node in a bucket
178 def _staleNodeHandler(oldnode=old, newnode = n):
179 """ called if the pinged node never responds """
180 self.table.replaceStaleNode(old, newnode)
182 def _notStaleNodeHandler(dict, old=old):
183 """ called when we get a pong from the old node """
185 if dict['id'] == old.id:
186 self.table.justSeenNode(old.id)
188 df = old.ping(self.node.id)
189 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
191 def sendPing(self, node, callback=None):
195 df = node.ping(self.node.id)
196 ## these are the callbacks we use when we issue a PING
197 def _pongHandler(dict, node=node, table=self.table, callback=callback):
198 _krpc_sender = dict['_krpc_sender']
200 sender = {'id' : dict['id']}
201 sender['host'] = _krpc_sender[0]
202 sender['port'] = _krpc_sender[1]
203 n = self.Node().initWithDict(sender)
204 n.conn = self.udp.connectionForAddr((n.host, n.port))
208 def _defaultPong(err, node=node, table=self.table, callback=callback):
209 table.nodeFailed(node)
213 df.addCallbacks(_pongHandler,_defaultPong)
215 def findCloseNodes(self, callback=lambda a: None):
217 This does a findNode on the ID one away from our own.
218 This will allow us to populate our table with nodes on our network closest to our own.
219 This is called as soon as we start up with an empty table
221 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
222 self.findNode(id, callback)
224 def refreshTable(self, force=0):
226 force=1 will refresh table regardless of last bucket access time
231 for bucket in self.table.buckets:
232 if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']):
233 id = newIDInRange(bucket.min, bucket.max)
234 self.findNode(id, callback)
238 Returns (num_contacts, num_nodes)
239 num_contacts: number contacts in our routing table
240 num_nodes: number of nodes estimated in the entire dht
242 num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
243 num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
244 return (num_contacts, num_nodes)
246 def krpc_ping(self, id, _krpc_sender):
248 sender['host'] = _krpc_sender[0]
249 sender['port'] = _krpc_sender[1]
250 n = self.Node().initWithDict(sender)
251 n.conn = self.udp.connectionForAddr((n.host, n.port))
252 self.insertNode(n, contacted=0)
253 return {"id" : self.node.id}
255 def krpc_find_node(self, target, id, _krpc_sender):
256 nodes = self.table.findNodes(target)
257 nodes = map(lambda node: node.senderDict(), nodes)
259 sender['host'] = _krpc_sender[0]
260 sender['port'] = _krpc_sender[1]
261 n = self.Node().initWithDict(sender)
262 n.conn = self.udp.connectionForAddr((n.host, n.port))
263 self.insertNode(n, contacted=0)
264 return {"nodes" : nodes, "id" : self.node.id}
267 ## This class provides read-only access to the DHT, valueForKey
268 ## you probably want to use this mixin and provide your own write methods
269 class KhashmirRead(KhashmirBase):
271 def retrieveValues(self, key):
272 c = self.store.cursor()
273 c.execute("select value from kv where key = %s;", sqlite.encode(key))
281 def valueForKey(self, key, callback, searchlocal = 1):
282 """ returns the values found for key in global table
283 callback will be called with a list of values for each peer that returns unique values
284 final callback will be an empty list - probably should change to 'more coming' arg
286 nodes = self.table.findNodes(key)
290 l = self.retrieveValues(key)
292 reactor.callLater(0, callback, (l))
296 # create our search state
297 state = GetValue(self, key, callback, self.config)
298 reactor.callLater(0, state.goWithNodes, nodes, l)
300 def krpc_find_value(self, key, id, _krpc_sender):
302 sender['host'] = _krpc_sender[0]
303 sender['port'] = _krpc_sender[1]
304 n = self.Node().initWithDict(sender)
305 n.conn = self.udp.connectionForAddr((n.host, n.port))
306 self.insertNode(n, contacted=0)
308 l = self.retrieveValues(key)
310 return {'values' : l, "id": self.node.id}
312 nodes = self.table.findNodes(key)
313 nodes = map(lambda node: node.senderDict(), nodes)
314 return {'nodes' : nodes, "id": self.node.id}
316 ### provides a generic write method, you probably don't want to deploy something that allows
317 ### arbitrary value storage
318 class KhashmirWrite(KhashmirRead):
320 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
321 def storeValueForKey(self, key, value, callback=None):
322 """ stores the value for key in the global table, returns immediately, no status
323 in this implementation, peers respond but don't indicate status to storing values
324 a key can have many values
326 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
329 def _storedValueHandler(sender):
331 response=_storedValueHandler
332 action = StoreValue(self.table, key, value, response, self.config)
333 reactor.callLater(0, action.goWithNodes, nodes)
335 # this call is asynch
336 self.findNode(key, _storeValueForKey)
338 def krpc_store_value(self, key, value, id, _krpc_sender):
340 c = self.store.cursor()
342 c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
343 except sqlite.IntegrityError, reason:
344 # update last insert time
345 c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
348 sender['host'] = _krpc_sender[0]
349 sender['port'] = _krpc_sender[1]
350 n = self.Node().initWithDict(sender)
351 n.conn = self.udp.connectionForAddr((n.host, n.port))
352 self.insertNode(n, contacted=0)
353 return {"id" : self.node.id}
355 # the whole shebang, for testing
356 class Khashmir(KhashmirWrite):
359 class SimpleTests(unittest.TestCase):
360 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
361 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
362 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
363 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
364 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
369 d = self.DHT_DEFAULTS.copy()
372 d = self.DHT_DEFAULTS.copy()
377 self.a.listenport.stopListening()
378 self.b.listenport.stopListening()
380 self.a.next_checkpoint.cancel()
384 self.b.next_checkpoint.cancel()
388 self.a.expirer.next_expire.cancel()
392 self.b.expirer.next_expire.cancel()
400 def testAddContact(self):
401 self.assertEqual(len(self.a.table.buckets), 1)
402 self.assertEqual(len(self.a.table.buckets[0].l), 0)
404 self.assertEqual(len(self.b.table.buckets), 1)
405 self.assertEqual(len(self.b.table.buckets[0].l), 0)
407 self.a.addContact('127.0.0.1', 4045)
413 self.assertEqual(len(self.a.table.buckets), 1)
414 self.assertEqual(len(self.a.table.buckets[0].l), 1)
415 self.assertEqual(len(self.b.table.buckets), 1)
416 self.assertEqual(len(self.b.table.buckets[0].l), 1)
418 def testStoreRetrieve(self):
419 self.a.addContact('127.0.0.1', 4045)
425 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
432 self.a.valueForKey(sha('foo').digest(), self._cb)
443 self.assertEqual(self.got, 1)
444 elif 'foobar' in val:
448 class MultiTest(unittest.TestCase):
450 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
451 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
452 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
453 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
454 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
457 def _done(self, val):
462 self.startport = 4088
463 for i in range(self.num):
464 d = self.DHT_DEFAULTS.copy()
465 d['PORT'] = self.startport + i
466 self.l.append(Khashmir(d))
471 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
472 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
473 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
480 i.findCloseNodes(self._done)
485 i.findCloseNodes(self._done)
491 i.listenport.stopListening()
493 i.next_checkpoint.cancel()
497 i.expirer.next_expire.cancel()
505 def testStoreRetrieve(self):
514 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
522 self.assertEqual(self.got, 1)
528 self.l[randrange(0, self.num)].valueForKey(K, _rcb)