1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
4 from const import reactor
11 from ktable import KTable, K
12 from knode import KNode as Node
14 from hash import newID, newIDInRange
16 from actions import FindNode, GetValue, KeyExpirer, StoreValue
20 from twisted.internet.defer import Deferred
21 from twisted.internet import protocol
22 from twisted.python import threadable
23 from twisted.internet.app import Application
24 from twisted.web import server
28 import sqlite ## find this at http://pysqlite.sourceforge.net/
29 import pysqlite_exceptions
31 class KhashmirDBExcept(Exception):
34 # this is the main class!
35 class Khashmir(protocol.Factory):
36 __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
38 def __init__(self, host, port, db='khashmir.db'):
39 self.setup(host, port, db)
41 def setup(self, host, port, db='khashmir.db'):
44 self.node = self._loadSelfNode(host, port)
45 self.table = KTable(self.node)
46 self.app = Application("krpc")
47 self.airhook = airhook.listenAirhookStream(port, self)
48 self.last = time.time()
49 self._loadRoutingTable()
50 KeyExpirer(store=self.store)
51 #self.refreshTable(force=1)
52 reactor.callLater(60, self.checkpoint, (1,))
54 def _loadSelfNode(self, host, port):
55 c = self.store.cursor()
56 c.execute('select id from self where num = 0;')
58 id = c.fetchone()[0].decode('hex')
61 return Node().init(id, host, port)
63 def _saveSelfNode(self):
64 self.store.autocommit = 0
65 c = self.store.cursor()
66 c.execute('delete from self where num = 0;')
67 c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex'))
69 self.store.autocommit = 1
71 def checkpoint(self, auto=0):
73 self._dumpRoutingTable()
75 reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
77 def _findDB(self, db):
86 def _loadDB(self, db):
88 self.store = sqlite.connect(db=db)
89 self.store.autocommit = 1
92 raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
94 def _createNewDB(self, db):
95 self.store = sqlite.connect(db=db)
96 self.store.autocommit = 1
98 create table kv (key text, value text, time timestamp, primary key (key, value));
99 create index kv_key on kv(key);
100 create index kv_timestamp on kv(time);
102 create table nodes (id text primary key, host text, port number);
104 create table self (num number primary key, id text);
106 c = self.store.cursor()
109 def _dumpRoutingTable(self):
111 save routing table nodes to the database
113 self.store.autocommit = 0;
114 c = self.store.cursor()
115 c.execute("delete from nodes where id not NULL;")
116 for bucket in self.table.buckets:
117 for node in bucket.l:
118 d = node.senderDict()
119 c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port']))
121 self.store.autocommit = 1;
123 def _loadRoutingTable(self):
125 load routing table nodes from database
126 it's usually a good idea to call refreshTable(force=1) after loading the table
128 c = self.store.cursor()
129 c.execute("select * from nodes;")
130 for rec in c.fetchall():
131 n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])})
132 n.conn = self.airhook.connectionForAddr((n.host, n.port))
133 self.table.insertNode(n, contacted=0)
137 ####### LOCAL INTERFACE - use these methods!
138 def addContact(self, host, port, callback=None):
140 ping this node and add the contact info to the table on pong!
142 n =Node().init(const.NULL_ID, host, port)
143 n.conn = self.airhook.connectionForAddr((n.host, n.port))
144 self.sendPing(n, callback=callback)
146 ## this call is async!
147 def findNode(self, id, callback, errback=None):
148 """ returns the contact info for node, or the k closest nodes, from the global table """
149 # get K nodes out of local table/cache, or the node we want
150 nodes = self.table.findNodes(id)
153 d.addCallbacks(callback, errback)
155 d.addCallback(callback)
156 if len(nodes) == 1 and nodes[0].id == id :
159 # create our search state
160 state = FindNode(self, id, d.callback)
161 reactor.callFromThread(state.goWithNodes, nodes)
165 def valueForKey(self, key, callback, searchlocal = 1):
166 """ returns the values found for key in global table
167 callback will be called with a list of values for each peer that returns unique values
168 final callback will be an empty list - probably should change to 'more coming' arg
170 nodes = self.table.findNodes(key)
174 l = self.retrieveValues(key)
176 reactor.callLater(0, callback, (l))
180 # create our search state
181 state = GetValue(self, key, callback)
182 reactor.callFromThread(state.goWithNodes, nodes, l)
184 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
185 def storeValueForKey(self, key, value, callback=None):
186 """ stores the value for key in the global table, returns immediately, no status
187 in this implementation, peers respond but don't indicate status to storing values
188 a key can have many values
190 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
193 def _storedValueHandler(sender):
195 response=_storedValueHandler
196 action = StoreValue(self.table, key, value, response)
197 reactor.callFromThread(action.goWithNodes, nodes)
199 # this call is asynch
200 self.findNode(key, _storeValueForKey)
203 def insertNode(self, n, contacted=1):
205 insert a node in our local table, pinging oldest contact in bucket, if necessary
207 If all you have is a host/port, then use addContact, which calls this method after
208 receiving the PONG from the remote node. The reason for the seperation is we can't insert
209 a node into the table without it's peer-ID. That means of course the node passed into this
210 method needs to be a properly formed Node object with a valid ID.
212 old = self.table.insertNode(n, contacted=contacted)
213 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
214 # the bucket is full, check to see if old node is still around and if so, replace it
216 ## these are the callbacks used when we ping the oldest node in a bucket
217 def _staleNodeHandler(oldnode=old, newnode = n):
218 """ called if the pinged node never responds """
219 self.table.replaceStaleNode(old, newnode)
221 def _notStaleNodeHandler(dict, old=old):
222 """ called when we get a pong from the old node """
223 _krpc_sender = dict['_krpc_sender']
225 sender = dict['sender']
226 if sender['id'] == old.id:
227 self.table.justSeenNode(old.id)
229 df = old.ping(self.node.senderDict())
230 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
232 def sendPing(self, node, callback=None):
236 df = node.ping(self.node.senderDict())
237 ## these are the callbacks we use when we issue a PING
238 def _pongHandler(dict, node=node, table=self.table, callback=callback):
239 _krpc_sender = dict['_krpc_sender']
241 sender = dict['sender']
242 if node.id != const.NULL_ID and node.id != sender['id']:
243 # whoah, got response from different peer than we were expecting
244 self.table.invalidateNode(node)
246 sender['host'] = node.host
247 sender['port'] = node.port
248 n = Node().initWithDict(sender)
249 n.conn = self.airhook.connectionForAddr((n.host, n.port))
253 def _defaultPong(err, node=node, table=self.table, callback=callback):
254 table.nodeFailed(node)
258 df.addCallbacks(_pongHandler,_defaultPong)
260 def findCloseNodes(self, callback=lambda a: None):
262 This does a findNode on the ID one away from our own.
263 This will allow us to populate our table with nodes on our network closest to our own.
264 This is called as soon as we start up with an empty table
266 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
267 self.findNode(id, callback)
269 def refreshTable(self, force=0):
271 force=1 will refresh table regardless of last bucket access time
276 for bucket in self.table.buckets:
277 if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
278 id = newIDInRange(bucket.min, bucket.max)
279 self.findNode(id, callback)
282 def retrieveValues(self, key):
283 s = "select value from kv where key = '%s';" % key.encode('hex')
284 c = self.store.cursor()
289 l.append(t['value'].decode('base64'))
294 ##### INCOMING MESSAGE HANDLERS
296 def krpc_ping(self, sender, _krpc_sender):
298 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
301 sender['host'] = _krpc_sender[0]
302 sender['port'] = _krpc_sender[1]
303 n = Node().initWithDict(sender)
304 n.conn = self.airhook.connectionForAddr((n.host, n.port))
305 self.insertNode(n, contacted=0)
306 return {"sender" : self.node.senderDict()}
308 def krpc_find_node(self, target, sender, _krpc_sender):
309 nodes = self.table.findNodes(target)
310 nodes = map(lambda node: node.senderDict(), nodes)
311 sender['host'] = _krpc_sender[0]
312 sender['port'] = _krpc_sender[1]
313 n = Node().initWithDict(sender)
314 n.conn = self.airhook.connectionForAddr((n.host, n.port))
315 self.insertNode(n, contacted=0)
316 return {"nodes" : nodes, "sender" : self.node.senderDict()}
318 def krpc_store_value(self, key, value, sender, _krpc_sender):
319 t = "%0.6f" % time.time()
320 s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t)
321 c = self.store.cursor()
324 except pysqlite_exceptions.IntegrityError, reason:
325 # update last insert time
326 s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
328 sender['host'] = _krpc_sender[0]
329 sender['port'] = _krpc_sender[1]
330 n = Node().initWithDict(sender)
331 n.conn = self.airhook.connectionForAddr((n.host, n.port))
332 self.insertNode(n, contacted=0)
333 return {"sender" : self.node.senderDict()}
335 def krpc_find_value(self, key, sender, _krpc_sender):
336 sender['host'] = _krpc_sender[0]
337 sender['port'] = _krpc_sender[1]
338 n = Node().initWithDict(sender)
339 n.conn = self.airhook.connectionForAddr((n.host, n.port))
340 self.insertNode(n, contacted=0)
342 l = self.retrieveValues(key)
344 return {'values' : l, "sender": self.node.senderDict()}
346 nodes = self.table.findNodes(key)
347 nodes = map(lambda node: node.senderDict(), nodes)
348 return {'nodes' : nodes, "sender": self.node.senderDict()}
351 from random import randrange
352 import threading, thread, sys, time
354 from hash import newID
357 def test_net(host='127.0.0.1', peers=24, startport=2001, dbprefix='/tmp/test'):
360 for i in xrange(peers):
361 a = Khashmir(host, startport + i, db = dbprefix+`i`)
363 thread.start_new_thread(l[0].app.run, ())
365 peer.app.run(installSignalHandlers=0)
368 def test_build_net(quiet=0, peers=24, host='127.0.0.1', pause=0, startport=2001, dbprefix='/tmp/test'):
369 from whrandom import randrange
376 print "Building %s peer table." % peers
378 for i in xrange(peers):
379 a = Khashmir(host, port + i, db = dbprefix +`i`)
383 thread.start_new_thread(l[0].app.run, ())
386 peer.app.run(installSignalHandlers=0)
389 def spewer(frame, s, ignored):
390 from twisted.python import reflect
391 if frame.f_locals.has_key('self'):
392 se = frame.f_locals['self']
393 print 'method %s of %s at %s' % (
394 frame.f_code.co_name, reflect.qual(se.__class__), id(se)
396 #sys.settrace(spewer)
398 print "adding contacts...."
405 p = l[randrange(0, len(l))]
408 flag = threading.Event()
409 peer.addContact(host, n.port, makecb(flag))
411 p = l[randrange(0, len(l))]
414 flag = threading.Event()
415 peer.addContact(host, n.port, makecb(flag))
417 p = l[randrange(0, len(l))]
420 flag = threading.Event()
421 peer.addContact(host, n.port, makecb(flag))
424 print "finding close nodes...."
427 flag = threading.Event()
428 def cb(nodes, f=flag):
430 peer.findCloseNodes(cb)
433 # peer.refreshTable()
436 def test_find_nodes(l, quiet=0):
437 flag = threading.Event()
441 a = l[randrange(0,n)]
442 b = l[randrange(0,n)]
444 def callback(nodes, flag=flag, id = b.node.id):
445 if (len(nodes) >0) and (nodes[0].id == id):
446 print "test_find_nodes PASSED"
448 print "test_find_nodes FAILED"
450 a.findNode(b.node.id, callback)
453 def test_find_value(l, quiet=0):
454 ff = threading.Event()
455 fa = threading.Event()
456 fb = threading.Event()
457 fc = threading.Event()
460 a = l[randrange(0,n)]
461 b = l[randrange(0,n)]
462 c = l[randrange(0,n)]
463 d = l[randrange(0,n)]
467 if not quiet: print "inserting value..."
470 a.storeValueForKey(key, value, acb)
477 def __init__(self, flag, value=value, port=None):
482 def callback(self, values):
483 if(len(values) == 0):
485 print "find %s NOT FOUND" % self.port
487 print "find %s FOUND" % self.port
490 if self.val in values:
493 b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
495 c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0)
497 d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0)
500 def test_one(host, port, db='/tmp/test'):
502 k = Khashmir(host, port, db)
503 thread.start_new_thread(reactor.run, ())
506 if __name__ == "__main__":
509 if len(sys.argv) > 1: n = int(sys.argv[1])
510 l = test_build_net(peers=n)
512 print "finding nodes..."
515 print "inserting and fetching values..."