1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
10 from ktable import KTable, K
11 from knode import KNode as Node
13 from hash import newID, newIDInRange
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
23 import sqlite ## find this at http://pysqlite.sourceforge.net/
24 import pysqlite_exceptions
26 KhashmirDBExcept = "KhashmirDBExcept"
28 # this is the main class!
29 class Khashmir(xmlrpc.XMLRPC):
30 __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
31 def __init__(self, host, port, db='khashmir.db'):
32 self.setup(host, port, db)
34 def setup(self, host, port, db='khashmir.db'):
36 self.node = self._loadSelfNode(host, port)
37 self.table = KTable(self.node)
38 self._loadRoutingTable()
39 self.app = Application("xmlrpc")
40 self.app.listenTCP(port, server.Site(self))
41 self.last = time.time()
42 KeyExpirer(store=self.store)
43 #self.refreshTable(force=1)
44 reactor.callLater(60, self.checkpoint, (1,))
46 def _loadSelfNode(self, host, port):
47 c = self.store.cursor()
48 c.execute('select id from self where num = 0;')
50 id = c.fetchone()[0].decode('base64')
53 return Node().init(id, host, port)
55 def _saveSelfNode(self):
56 self.store.autocommit = 0
57 c = self.store.cursor()
58 c.execute('delete from self where num = 0;')
59 c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64'))
61 self.store.autocommit = 1
63 def checkpoint(self, auto=0):
65 self._dumpRoutingTable()
67 reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
69 def _findDB(self, db):
78 def _loadDB(self, db):
80 self.store = sqlite.connect(db=db)
81 self.store.autocommit = 1
84 raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
86 def _createNewDB(self, db):
87 self.store = sqlite.connect(db=db)
88 self.store.autocommit = 1
90 create table kv (key text, value text, time timestamp, primary key (key, value));
91 create index kv_key on kv(key);
92 create index kv_timestamp on kv(time);
94 create table nodes (id text primary key, host text, port number);
96 create table self (num number primary key, id text);
98 c = self.store.cursor()
101 def _dumpRoutingTable(self):
103 save routing table nodes to the database
105 self.store.autocommit = 0;
106 c = self.store.cursor()
107 c.execute("delete from nodes where id not NULL;")
108 for bucket in self.table.buckets:
109 for node in bucket.l:
110 d = node.senderDict()
111 c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port']))
113 self.store.autocommit = 1;
115 def _loadRoutingTable(self):
117 load routing table nodes from database
118 it's usually a good idea to call refreshTable(force=1) after loading the table
120 c = self.store.cursor()
121 c.execute("select * from nodes;")
122 for rec in c.fetchall():
123 n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
124 self.table.insertNode(n, contacted=0)
126 def render(self, request):
128 Override the built in render so we can have access to the request object!
129 note, crequest is probably only valid on the initial call (not after deferred!)
131 self.crequest = request
132 return xmlrpc.XMLRPC.render(self, request)
136 ####### LOCAL INTERFACE - use these methods!
137 def addContact(self, host, port):
139 ping this node and add the contact info to the table on pong!
141 n =Node().init(const.NULL_ID, host, port) # note, we
144 ## this call is async!
145 def findNode(self, id, callback, errback=None):
146 """ returns the contact info for node, or the k closest nodes, from the global table """
147 # get K nodes out of local table/cache, or the node we want
148 nodes = self.table.findNodes(id)
151 d.addCallbacks(callback, errback)
153 d.addCallback(callback)
154 if len(nodes) == 1 and nodes[0].id == id :
157 # create our search state
158 state = FindNode(self, id, d.callback)
159 reactor.callFromThread(state.goWithNodes, nodes)
163 def valueForKey(self, key, callback):
164 """ returns the values found for key in global table
165 callback will be called with a list of values for each peer that returns unique values
166 final callback will be an empty list - probably should change to 'more coming' arg
168 nodes = self.table.findNodes(key)
171 l = self.retrieveValues(key)
173 reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
175 # create our search state
176 state = GetValue(self, key, callback)
177 reactor.callFromThread(state.goWithNodes, nodes, l)
179 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
180 def storeValueForKey(self, key, value, callback=None):
181 """ stores the value for key in the global table, returns immediately, no status
182 in this implementation, peers respond but don't indicate status to storing values
183 a key can have many values
185 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
188 def _storedValueHandler(sender):
190 response=_storedValueHandler
192 for node in nodes[:const.STORE_REDUNDANCY]:
193 def cb(t, table = table, node=node, resp=response):
194 self.table.insertNode(node)
196 if node.id != self.node.id:
197 def default(err, node=node, table=table):
198 table.nodeFailed(node)
199 df = node.storeValue(key, value, self.node.senderDict())
200 df.addCallbacks(cb, default)
201 # this call is asynch
202 self.findNode(key, _storeValueForKey)
205 def insertNode(self, n, contacted=1):
207 insert a node in our local table, pinging oldest contact in bucket, if necessary
209 If all you have is a host/port, then use addContact, which calls this method after
210 receiving the PONG from the remote node. The reason for the seperation is we can't insert
211 a node into the table without it's peer-ID. That means of course the node passed into this
212 method needs to be a properly formed Node object with a valid ID.
214 old = self.table.insertNode(n, contacted=contacted)
215 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
216 # the bucket is full, check to see if old node is still around and if so, replace it
218 ## these are the callbacks used when we ping the oldest node in a bucket
219 def _staleNodeHandler(oldnode=old, newnode = n):
220 """ called if the pinged node never responds """
221 self.table.replaceStaleNode(old, newnode)
223 def _notStaleNodeHandler(sender, old=old):
224 """ called when we get a pong from the old node """
225 args, sender = sender
226 sender = Node().initWithDict(sender)
227 if sender.id == old.id:
228 self.table.justSeenNode(old)
230 df = old.ping(self.node.senderDict())
231 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
233 def sendPing(self, node):
237 df = node.ping(self.node.senderDict())
238 ## these are the callbacks we use when we issue a PING
239 def _pongHandler(args, node=node, table=self.table):
241 if node.id != const.NULL_ID and node.id != sender['id'].decode('base64'):
242 # whoah, got response from different peer than we were expecting
243 self.table.invalidateNode(node)
245 sender['host'] = node.host
246 sender['port'] = node.port
247 n = Node().initWithDict(sender)
250 def _defaultPong(err, node=node, table=self.table):
251 table.nodeFailed(node)
253 df.addCallbacks(_pongHandler,_defaultPong)
255 def findCloseNodes(self, callback=lambda a: None):
257 This does a findNode on the ID one away from our own.
258 This will allow us to populate our table with nodes on our network closest to our own.
259 This is called as soon as we start up with an empty table
261 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
262 self.findNode(id, callback)
264 def refreshTable(self, force=0):
266 force=1 will refresh table regardless of last bucket access time
271 for bucket in self.table.buckets:
272 if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
273 id = newIDInRange(bucket.min, bucket.max)
274 self.findNode(id, callback)
277 def retrieveValues(self, key):
278 s = "select value from kv where key = '%s';" % key.encode('base64')
279 c = self.store.cursor()
289 ##### INCOMING MESSAGE HANDLERS
291 def xmlrpc_ping(self, sender):
293 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
296 ip = self.crequest.getClientIP()
298 n = Node().initWithDict(sender)
299 self.insertNode(n, contacted=0)
300 return (), self.node.senderDict()
302 def xmlrpc_find_node(self, target, sender):
303 nodes = self.table.findNodes(target.decode('base64'))
304 nodes = map(lambda node: node.senderDict(), nodes)
305 ip = self.crequest.getClientIP()
307 n = Node().initWithDict(sender)
308 self.insertNode(n, contacted=0)
309 return nodes, self.node.senderDict()
311 def xmlrpc_store_value(self, key, value, sender):
312 t = "%0.6f" % time.time()
313 s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
314 c = self.store.cursor()
317 except pysqlite_exceptions.IntegrityError, reason:
318 # update last insert time
319 s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
321 ip = self.crequest.getClientIP()
323 n = Node().initWithDict(sender)
324 self.insertNode(n, contacted=0)
325 return (), self.node.senderDict()
327 def xmlrpc_find_value(self, key, sender):
328 ip = self.crequest.getClientIP()
329 key = key.decode('base64')
331 n = Node().initWithDict(sender)
332 self.insertNode(n, contacted=0)
334 l = self.retrieveValues(key)
336 return {'values' : l}, self.node.senderDict()
338 nodes = self.table.findNodes(key)
339 nodes = map(lambda node: node.senderDict(), nodes)
340 return {'nodes' : nodes}, self.node.senderDict()
343 from random import randrange
344 import threading, thread, sys, time
346 from hash import newID
349 def test_net(peers=24, startport=2001, dbprefix='/tmp/test'):
352 for i in xrange(peers):
353 a = Khashmir('localhost', startport + i, db = dbprefix+`i`)
355 thread.start_new_thread(l[0].app.run, ())
360 def test_build_net(quiet=0, peers=24, host='localhost', pause=0, startport=2001, dbprefix='/tmp/test'):
361 from whrandom import randrange
368 print "Building %s peer table." % peers
370 for i in xrange(peers):
371 a = Khashmir(host, port + i, db = dbprefix +`i`)
375 thread.start_new_thread(l[0].app.run, ())
381 print "adding contacts...."
384 n = l[randrange(0, len(l))].node
385 peer.addContact(host, n.port)
386 n = l[randrange(0, len(l))].node
387 peer.addContact(host, n.port)
388 n = l[randrange(0, len(l))].node
389 peer.addContact(host, n.port)
394 print "finding close nodes...."
397 flag = threading.Event()
398 def cb(nodes, f=flag):
400 peer.findCloseNodes(cb)
403 # peer.refreshTable()
406 def test_find_nodes(l, quiet=0):
407 flag = threading.Event()
411 a = l[randrange(0,n)]
412 b = l[randrange(0,n)]
414 def callback(nodes, flag=flag, id = b.node.id):
415 if (len(nodes) >0) and (nodes[0].id == id):
416 print "test_find_nodes PASSED"
418 print "test_find_nodes FAILED"
420 a.findNode(b.node.id, callback)
423 def test_find_value(l, quiet=0):
425 fa = threading.Event()
426 fb = threading.Event()
427 fc = threading.Event()
430 a = l[randrange(0,n)]
431 b = l[randrange(0,n)]
432 c = l[randrange(0,n)]
433 d = l[randrange(0,n)]
437 if not quiet: print "inserting value..."
438 a.storeValueForKey(key, value)
444 def __init__(self, flag, value=value):
448 def callback(self, values):
450 if(len(values) == 0):
452 print "find NOT FOUND"
456 if self.val in values:
461 b.valueForKey(key, cb(fa).callback)
463 c.valueForKey(key, cb(fb).callback)
465 d.valueForKey(key, cb(fc).callback)
468 def test_one(host, port, db='/tmp/test'):
470 k = Khashmir(host, port, db)
471 thread.start_new_thread(k.app.run, ())
474 if __name__ == "__main__":
477 if len(sys.argv) > 1: n = int(sys.argv[1])
478 l = test_build_net(peers=n)
480 print "finding nodes..."
483 print "inserting and fetching values..."