1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
10 from ktable import KTable, K
11 from knode import KNode as Node
13 from hash import newID, newIDInRange
15 from actions import FindNode, GetValue, KeyExpirer, StoreValue
19 from twisted.internet.defer import Deferred
20 from twisted.internet import protocol
21 from twisted.python import threadable
22 from twisted.internet.app import Application
23 from twisted.web import server
27 import sqlite ## find this at http://pysqlite.sourceforge.net/
28 import pysqlite_exceptions
30 class KhashmirDBExcept(Exception):
33 # this is the main class!
34 class Khashmir(protocol.Factory):
35 __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
37 def __init__(self, host, port, db='khashmir.db'):
38 self.setup(host, port, db)
40 def setup(self, host, port, db='khashmir.db'):
43 self.node = self._loadSelfNode(host, port)
44 self.table = KTable(self.node)
45 self.app = Application("krpc")
46 self.airhook = airhook.listenAirhookStream(port, self)
47 self.last = time.time()
48 self._loadRoutingTable()
49 KeyExpirer(store=self.store)
50 #self.refreshTable(force=1)
51 reactor.callLater(60, self.checkpoint, (1,))
53 def _loadSelfNode(self, host, port):
54 c = self.store.cursor()
55 c.execute('select id from self where num = 0;')
57 id = c.fetchone()[0].decode('hex')
60 return Node().init(id, host, port)
62 def _saveSelfNode(self):
63 self.store.autocommit = 0
64 c = self.store.cursor()
65 c.execute('delete from self where num = 0;')
66 c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex'))
68 self.store.autocommit = 1
70 def checkpoint(self, auto=0):
72 self._dumpRoutingTable()
74 reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
76 def _findDB(self, db):
85 def _loadDB(self, db):
87 self.store = sqlite.connect(db=db)
88 self.store.autocommit = 1
91 raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
93 def _createNewDB(self, db):
94 self.store = sqlite.connect(db=db)
95 self.store.autocommit = 1
97 create table kv (key text, value text, time timestamp, primary key (key, value));
98 create index kv_key on kv(key);
99 create index kv_timestamp on kv(time);
101 create table nodes (id text primary key, host text, port number);
103 create table self (num number primary key, id text);
105 c = self.store.cursor()
108 def _dumpRoutingTable(self):
110 save routing table nodes to the database
112 self.store.autocommit = 0;
113 c = self.store.cursor()
114 c.execute("delete from nodes where id not NULL;")
115 for bucket in self.table.buckets:
116 for node in bucket.l:
117 d = node.senderDict()
118 c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port']))
120 self.store.autocommit = 1;
122 def _loadRoutingTable(self):
124 load routing table nodes from database
125 it's usually a good idea to call refreshTable(force=1) after loading the table
127 c = self.store.cursor()
128 c.execute("select * from nodes;")
129 for rec in c.fetchall():
130 n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])})
131 n.conn = self.airhook.connectionForAddr((n.host, n.port))
132 self.table.insertNode(n, contacted=0)
136 ####### LOCAL INTERFACE - use these methods!
137 def addContact(self, host, port, callback=None):
139 ping this node and add the contact info to the table on pong!
141 n =Node().init(const.NULL_ID, host, port)
142 n.conn = self.airhook.connectionForAddr((n.host, n.port))
143 self.sendPing(n, callback=callback)
145 ## this call is async!
146 def findNode(self, id, callback, errback=None):
147 """ returns the contact info for node, or the k closest nodes, from the global table """
148 # get K nodes out of local table/cache, or the node we want
149 nodes = self.table.findNodes(id)
152 d.addCallbacks(callback, errback)
154 d.addCallback(callback)
155 if len(nodes) == 1 and nodes[0].id == id :
158 # create our search state
159 state = FindNode(self, id, d.callback)
160 reactor.callFromThread(state.goWithNodes, nodes)
164 def valueForKey(self, key, callback, searchlocal = 1):
165 """ returns the values found for key in global table
166 callback will be called with a list of values for each peer that returns unique values
167 final callback will be an empty list - probably should change to 'more coming' arg
169 nodes = self.table.findNodes(key)
173 l = self.retrieveValues(key)
175 reactor.callLater(0, callback, (l))
179 # create our search state
180 state = GetValue(self, key, callback)
181 reactor.callFromThread(state.goWithNodes, nodes, l)
183 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
184 def storeValueForKey(self, key, value, callback=None):
185 """ stores the value for key in the global table, returns immediately, no status
186 in this implementation, peers respond but don't indicate status to storing values
187 a key can have many values
189 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
192 def _storedValueHandler(sender):
194 response=_storedValueHandler
195 action = StoreValue(self.table, key, value, response)
196 reactor.callFromThread(action.goWithNodes, nodes)
198 # this call is asynch
199 self.findNode(key, _storeValueForKey)
202 def insertNode(self, n, contacted=1):
204 insert a node in our local table, pinging oldest contact in bucket, if necessary
206 If all you have is a host/port, then use addContact, which calls this method after
207 receiving the PONG from the remote node. The reason for the seperation is we can't insert
208 a node into the table without it's peer-ID. That means of course the node passed into this
209 method needs to be a properly formed Node object with a valid ID.
211 old = self.table.insertNode(n, contacted=contacted)
212 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
213 # the bucket is full, check to see if old node is still around and if so, replace it
215 ## these are the callbacks used when we ping the oldest node in a bucket
216 def _staleNodeHandler(oldnode=old, newnode = n):
217 """ called if the pinged node never responds """
218 self.table.replaceStaleNode(old, newnode)
220 def _notStaleNodeHandler(dict, old=old):
221 """ called when we get a pong from the old node """
222 _krpc_sender = dict['_krpc_sender']
224 sender = dict['sender']
225 if sender['id'] == old.id:
226 self.table.justSeenNode(old.id)
228 df = old.ping(self.node.senderDict())
229 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
231 def sendPing(self, node, callback=None):
235 df = node.ping(self.node.senderDict())
236 ## these are the callbacks we use when we issue a PING
237 def _pongHandler(dict, node=node, table=self.table, callback=callback):
238 _krpc_sender = dict['_krpc_sender']
240 sender = dict['sender']
241 if node.id != const.NULL_ID and node.id != sender['id']:
242 # whoah, got response from different peer than we were expecting
243 self.table.invalidateNode(node)
245 sender['host'] = node.host
246 sender['port'] = node.port
247 n = Node().initWithDict(sender)
248 n.conn = self.airhook.connectionForAddr((n.host, n.port))
252 def _defaultPong(err, node=node, table=self.table, callback=callback):
253 table.nodeFailed(node)
257 df.addCallbacks(_pongHandler,_defaultPong)
259 def findCloseNodes(self, callback=lambda a: None):
261 This does a findNode on the ID one away from our own.
262 This will allow us to populate our table with nodes on our network closest to our own.
263 This is called as soon as we start up with an empty table
265 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
266 self.findNode(id, callback)
268 def refreshTable(self, force=0):
270 force=1 will refresh table regardless of last bucket access time
275 for bucket in self.table.buckets:
276 if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
277 id = newIDInRange(bucket.min, bucket.max)
278 self.findNode(id, callback)
281 def retrieveValues(self, key):
282 s = "select value from kv where key = '%s';" % key.encode('hex')
283 c = self.store.cursor()
288 l.append(t['value'].decode('base64'))
293 ##### INCOMING MESSAGE HANDLERS
295 def krpc_ping(self, sender, _krpc_sender):
297 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
300 sender['host'] = _krpc_sender[0]
301 sender['port'] = _krpc_sender[1]
302 n = Node().initWithDict(sender)
303 n.conn = self.airhook.connectionForAddr((n.host, n.port))
304 self.insertNode(n, contacted=0)
305 return {"sender" : self.node.senderDict()}
307 def krpc_find_node(self, target, sender, _krpc_sender):
308 nodes = self.table.findNodes(target)
309 nodes = map(lambda node: node.senderDict(), nodes)
310 sender['host'] = _krpc_sender[0]
311 sender['port'] = _krpc_sender[1]
312 n = Node().initWithDict(sender)
313 n.conn = self.airhook.connectionForAddr((n.host, n.port))
314 self.insertNode(n, contacted=0)
315 return {"nodes" : nodes, "sender" : self.node.senderDict()}
317 def krpc_store_value(self, key, value, sender, _krpc_sender):
318 t = "%0.6f" % time.time()
319 s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t)
320 c = self.store.cursor()
323 except pysqlite_exceptions.IntegrityError, reason:
324 # update last insert time
325 s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
327 sender['host'] = _krpc_sender[0]
328 sender['port'] = _krpc_sender[1]
329 n = Node().initWithDict(sender)
330 n.conn = self.airhook.connectionForAddr((n.host, n.port))
331 self.insertNode(n, contacted=0)
332 return {"sender" : self.node.senderDict()}
334 def krpc_find_value(self, key, sender, _krpc_sender):
335 sender['host'] = _krpc_sender[0]
336 sender['port'] = _krpc_sender[1]
337 n = Node().initWithDict(sender)
338 n.conn = self.airhook.connectionForAddr((n.host, n.port))
339 self.insertNode(n, contacted=0)
341 l = self.retrieveValues(key)
343 return {'values' : l, "sender": self.node.senderDict()}
345 nodes = self.table.findNodes(key)
346 nodes = map(lambda node: node.senderDict(), nodes)
347 return {'nodes' : nodes, "sender": self.node.senderDict()}
350 from random import randrange
351 import threading, thread, sys, time
353 from hash import newID
356 def test_net(host='127.0.0.1', peers=24, startport=2001, dbprefix='/tmp/test'):
359 for i in xrange(peers):
360 a = Khashmir(host, startport + i, db = dbprefix+`i`)
362 thread.start_new_thread(l[0].app.run, ())
367 def test_build_net(quiet=0, peers=24, host='127.0.0.1', pause=0, startport=2001, dbprefix='/tmp/test'):
368 from whrandom import randrange
375 print "Building %s peer table." % peers
377 for i in xrange(peers):
378 a = Khashmir(host, port + i, db = dbprefix +`i`)
382 thread.start_new_thread(l[0].app.run, ())
388 def spewer(frame, s, ignored):
389 from twisted.python import reflect
390 if frame.f_locals.has_key('self'):
391 se = frame.f_locals['self']
392 print 'method %s of %s at %s' % (
393 frame.f_code.co_name, reflect.qual(se.__class__), id(se)
395 #sys.settrace(spewer)
397 print "adding contacts...."
404 p = l[randrange(0, len(l))]
407 flag = threading.Event()
408 peer.addContact(host, n.port, makecb(flag))
410 p = l[randrange(0, len(l))]
413 flag = threading.Event()
414 peer.addContact(host, n.port, makecb(flag))
416 p = l[randrange(0, len(l))]
419 flag = threading.Event()
420 peer.addContact(host, n.port, makecb(flag))
423 print "finding close nodes...."
426 flag = threading.Event()
427 def cb(nodes, f=flag):
429 peer.findCloseNodes(cb)
432 # peer.refreshTable()
435 def test_find_nodes(l, quiet=0):
436 flag = threading.Event()
440 a = l[randrange(0,n)]
441 b = l[randrange(0,n)]
443 def callback(nodes, flag=flag, id = b.node.id):
444 if (len(nodes) >0) and (nodes[0].id == id):
445 print "test_find_nodes PASSED"
447 print "test_find_nodes FAILED"
449 a.findNode(b.node.id, callback)
452 def test_find_value(l, quiet=0):
453 ff = threading.Event()
454 fa = threading.Event()
455 fb = threading.Event()
456 fc = threading.Event()
459 a = l[randrange(0,n)]
460 b = l[randrange(0,n)]
461 c = l[randrange(0,n)]
462 d = l[randrange(0,n)]
466 if not quiet: print "inserting value..."
469 a.storeValueForKey(key, value, acb)
476 def __init__(self, flag, value=value, port=None):
481 def callback(self, values):
482 if(len(values) == 0):
484 print "find %s NOT FOUND" % self.port
486 print "find %s FOUND" % self.port
489 if self.val in values:
492 b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0)
494 c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0)
496 d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0)
499 def test_one(host, port, db='/tmp/test'):
501 k = Khashmir(host, port, db)
502 thread.start_new_thread(reactor.run, ())
505 if __name__ == "__main__":
508 if len(sys.argv) > 1: n = int(sys.argv[1])
509 l = test_build_net(peers=n)
511 print "finding nodes..."
514 print "inserting and fetching values..."