1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
5 from random import randrange
8 import sqlite ## find this at http://pysqlite.sourceforge.net/
10 from twisted.internet.defer import Deferred
11 from twisted.internet import protocol, reactor
12 from twisted.trial import unittest
14 from ktable import KTable
15 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
16 from khash import newID, newIDInRange
17 from actions import FindNode, GetValue, KeyExpirer, StoreValue
20 class KhashmirDBExcept(Exception):
23 # this is the base class, has base functionality and find node, no key-value mappings
24 class KhashmirBase(protocol.Factory):
26 def __init__(self, config, cache_dir='/tmp'):
28 self.setup(config, cache_dir)
30 def setup(self, config, cache_dir):
32 self.port = config['PORT']
33 self._findDB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db'))
34 self.node = self._loadSelfNode('', self.port)
35 self.table = KTable(self.node, config)
36 #self.app = service.Application("krpc")
37 self.udp = krpc.hostbroker(self)
38 self.udp.protocol = krpc.KRPC
39 self.listenport = reactor.listenUDP(self.port, self.udp)
41 self._loadRoutingTable()
42 self.expirer = KeyExpirer(self.store, config)
43 self.refreshTable(force=1)
44 self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,))
52 self.listenport.stopListening()
54 def _loadSelfNode(self, host, port):
55 c = self.store.cursor()
56 c.execute('select id from self where num = 0;')
61 return self._Node().init(id, host, port)
63 def _saveSelfNode(self):
64 c = self.store.cursor()
65 c.execute('delete from self where num = 0;')
66 c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id))
69 def checkpoint(self, auto=0):
71 self._dumpRoutingTable()
74 self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9),
75 int(self.config['CHECKPOINT_INTERVAL'] * 1.1)),
76 self.checkpoint, (1,))
78 def _findDB(self, db):
87 def _loadDB(self, db):
89 self.store = sqlite.connect(db=db)
90 #self.store.autocommit = 0
93 raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc()
95 def _createNewDB(self, db):
96 self.store = sqlite.connect(db=db)
98 create table kv (key binary, value binary, time timestamp, primary key (key, value));
99 create index kv_key on kv(key);
100 create index kv_timestamp on kv(time);
102 create table nodes (id binary primary key, host text, port number);
104 create table self (num number primary key, id binary);
106 c = self.store.cursor()
110 def _dumpRoutingTable(self):
112 save routing table nodes to the database
114 c = self.store.cursor()
115 c.execute("delete from nodes where id not NULL;")
116 for bucket in self.table.buckets:
117 for node in bucket.l:
118 c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port))
121 def _loadRoutingTable(self):
123 load routing table nodes from database
124 it's usually a good idea to call refreshTable(force=1) after loading the table
126 c = self.store.cursor()
127 c.execute("select * from nodes;")
128 for rec in c.fetchall():
129 n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
130 n.conn = self.udp.connectionForAddr((n.host, n.port))
131 self.table.insertNode(n, contacted=0)
135 ####### LOCAL INTERFACE - use these methods!
136 def addContact(self, host, port, callback=None):
138 ping this node and add the contact info to the table on pong!
140 n =self.Node().init(NULL_ID, host, port)
141 n.conn = self.udp.connectionForAddr((n.host, n.port))
142 self.sendPing(n, callback=callback)
144 ## this call is async!
145 def findNode(self, id, callback, errback=None):
146 """ returns the contact info for node, or the k closest nodes, from the global table """
147 # get K nodes out of local table/cache, or the node we want
148 nodes = self.table.findNodes(id)
151 d.addCallbacks(callback, errback)
153 d.addCallback(callback)
154 if len(nodes) == 1 and nodes[0].id == id :
157 # create our search state
158 state = FindNode(self, id, d.callback, self.config)
159 reactor.callLater(0, state.goWithNodes, nodes)
161 def insertNode(self, n, contacted=1):
163 insert a node in our local table, pinging oldest contact in bucket, if necessary
165 If all you have is a host/port, then use addContact, which calls this method after
166 receiving the PONG from the remote node. The reason for the seperation is we can't insert
167 a node into the table without it's peer-ID. That means of course the node passed into this
168 method needs to be a properly formed Node object with a valid ID.
170 old = self.table.insertNode(n, contacted=contacted)
171 if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id:
172 # the bucket is full, check to see if old node is still around and if so, replace it
174 ## these are the callbacks used when we ping the oldest node in a bucket
175 def _staleNodeHandler(oldnode=old, newnode = n):
176 """ called if the pinged node never responds """
177 self.table.replaceStaleNode(old, newnode)
179 def _notStaleNodeHandler(dict, old=old):
180 """ called when we get a pong from the old node """
182 if dict['id'] == old.id:
183 self.table.justSeenNode(old.id)
185 df = old.ping(self.node.id)
186 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
188 def sendPing(self, node, callback=None):
192 df = node.ping(self.node.id)
193 ## these are the callbacks we use when we issue a PING
194 def _pongHandler(dict, node=node, table=self.table, callback=callback):
195 _krpc_sender = dict['_krpc_sender']
197 sender = {'id' : dict['id']}
198 sender['host'] = _krpc_sender[0]
199 sender['port'] = _krpc_sender[1]
200 n = self.Node().initWithDict(sender)
201 n.conn = self.udp.connectionForAddr((n.host, n.port))
205 def _defaultPong(err, node=node, table=self.table, callback=callback):
206 table.nodeFailed(node)
210 df.addCallbacks(_pongHandler,_defaultPong)
212 def findCloseNodes(self, callback=lambda a: None):
214 This does a findNode on the ID one away from our own.
215 This will allow us to populate our table with nodes on our network closest to our own.
216 This is called as soon as we start up with an empty table
218 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
219 self.findNode(id, callback)
221 def refreshTable(self, force=0):
223 force=1 will refresh table regardless of last bucket access time
228 for bucket in self.table.buckets:
229 if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']):
230 id = newIDInRange(bucket.min, bucket.max)
231 self.findNode(id, callback)
235 Returns (num_contacts, num_nodes)
236 num_contacts: number contacts in our routing table
237 num_nodes: number of nodes estimated in the entire dht
239 num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
240 num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
241 return (num_contacts, num_nodes)
243 def krpc_ping(self, id, _krpc_sender):
245 sender['host'] = _krpc_sender[0]
246 sender['port'] = _krpc_sender[1]
247 n = self.Node().initWithDict(sender)
248 n.conn = self.udp.connectionForAddr((n.host, n.port))
249 self.insertNode(n, contacted=0)
250 return {"id" : self.node.id}
252 def krpc_find_node(self, target, id, _krpc_sender):
253 nodes = self.table.findNodes(target)
254 nodes = map(lambda node: node.senderDict(), nodes)
256 sender['host'] = _krpc_sender[0]
257 sender['port'] = _krpc_sender[1]
258 n = self.Node().initWithDict(sender)
259 n.conn = self.udp.connectionForAddr((n.host, n.port))
260 self.insertNode(n, contacted=0)
261 return {"nodes" : nodes, "id" : self.node.id}
264 ## This class provides read-only access to the DHT, valueForKey
265 ## you probably want to use this mixin and provide your own write methods
266 class KhashmirRead(KhashmirBase):
268 def retrieveValues(self, key):
269 c = self.store.cursor()
270 c.execute("select value from kv where key = %s;", sqlite.encode(key))
278 def valueForKey(self, key, callback, searchlocal = 1):
279 """ returns the values found for key in global table
280 callback will be called with a list of values for each peer that returns unique values
281 final callback will be an empty list - probably should change to 'more coming' arg
283 nodes = self.table.findNodes(key)
287 l = self.retrieveValues(key)
289 reactor.callLater(0, callback, (l))
293 # create our search state
294 state = GetValue(self, key, callback, self.config)
295 reactor.callLater(0, state.goWithNodes, nodes, l)
297 def krpc_find_value(self, key, id, _krpc_sender):
299 sender['host'] = _krpc_sender[0]
300 sender['port'] = _krpc_sender[1]
301 n = self.Node().initWithDict(sender)
302 n.conn = self.udp.connectionForAddr((n.host, n.port))
303 self.insertNode(n, contacted=0)
305 l = self.retrieveValues(key)
307 return {'values' : l, "id": self.node.id}
309 nodes = self.table.findNodes(key)
310 nodes = map(lambda node: node.senderDict(), nodes)
311 return {'nodes' : nodes, "id": self.node.id}
313 ### provides a generic write method, you probably don't want to deploy something that allows
314 ### arbitrary value storage
315 class KhashmirWrite(KhashmirRead):
317 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
318 def storeValueForKey(self, key, value, callback=None):
319 """ stores the value for key in the global table, returns immediately, no status
320 in this implementation, peers respond but don't indicate status to storing values
321 a key can have many values
323 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
326 def _storedValueHandler(sender):
328 response=_storedValueHandler
329 action = StoreValue(self.table, key, value, response, self.config)
330 reactor.callLater(0, action.goWithNodes, nodes)
332 # this call is asynch
333 self.findNode(key, _storeValueForKey)
335 def krpc_store_value(self, key, value, id, _krpc_sender):
337 c = self.store.cursor()
339 c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t))
340 except sqlite.IntegrityError, reason:
341 # update last insert time
342 c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value)))
345 sender['host'] = _krpc_sender[0]
346 sender['port'] = _krpc_sender[1]
347 n = self.Node().initWithDict(sender)
348 n.conn = self.udp.connectionForAddr((n.host, n.port))
349 self.insertNode(n, contacted=0)
350 return {"id" : self.node.id}
352 # the whole shebang, for testing
353 class Khashmir(KhashmirWrite):
356 class SimpleTests(unittest.TestCase):
357 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
358 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
359 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
360 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
361 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
365 d = self.DHT_DEFAULTS.copy()
368 d = self.DHT_DEFAULTS.copy()
373 self.a.listenport.stopListening()
374 self.b.listenport.stopListening()
376 self.a.next_checkpoint.cancel()
380 self.b.next_checkpoint.cancel()
384 self.a.expirer.next_expire.cancel()
388 self.b.expirer.next_expire.cancel()
396 def testAddContact(self):
397 self.assertEqual(len(self.a.table.buckets), 1)
398 self.assertEqual(len(self.a.table.buckets[0].l), 0)
400 self.assertEqual(len(self.b.table.buckets), 1)
401 self.assertEqual(len(self.b.table.buckets[0].l), 0)
403 self.a.addContact('127.0.0.1', 4045)
409 self.assertEqual(len(self.a.table.buckets), 1)
410 self.assertEqual(len(self.a.table.buckets[0].l), 1)
411 self.assertEqual(len(self.b.table.buckets), 1)
412 self.assertEqual(len(self.b.table.buckets[0].l), 1)
414 def testStoreRetrieve(self):
415 self.a.addContact('127.0.0.1', 4045)
421 self.a.storeValueForKey(sha('foo').digest(), 'foobar')
428 self.a.valueForKey(sha('foo').digest(), self._cb)
439 self.assertEqual(self.got, 1)
440 elif 'foobar' in val:
444 class MultiTest(unittest.TestCase):
446 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160,
447 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4,
448 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
449 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
450 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
453 def _done(self, val):
458 self.startport = 4088
459 for i in range(self.num):
460 d = self.DHT_DEFAULTS.copy()
461 d['PORT'] = self.startport + i
462 self.l.append(Khashmir(d))
467 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
468 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
469 i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port)
476 i.findCloseNodes(self._done)
481 i.findCloseNodes(self._done)
487 i.listenport.stopListening()
489 i.next_checkpoint.cancel()
493 i.expirer.next_expire.cancel()
501 def testStoreRetrieve(self):
510 self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
518 self.assertEqual(self.got, 1)
524 self.l[randrange(0, self.num)].valueForKey(K, _rcb)