1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
10 from ktable import KTable, K
11 from knode import KNode as Node
13 from hash import newID, newIDInRange
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
23 import sqlite ## find this at http://pysqlite.sourceforge.net/
24 import pysqlite_exceptions
26 KhashmirDBExcept = "KhashmirDBExcept"
28 # this is the main class!
29 class Khashmir(xmlrpc.XMLRPC):
30 __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
31 def __init__(self, host, port, db='khashmir.db'):
32 self.setup(host, port, db)
34 def setup(self, host, port, db='khashmir.db'):
36 self.node = self.loadSelfNode(host, port)
37 self.table = KTable(self.node)
38 self.loadRoutingTable()
39 self.app = Application("xmlrpc")
40 self.app.listenTCP(port, server.Site(self))
41 self.last = time.time()
42 KeyExpirer(store=self.store)
43 reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
44 self.refreshTable(force=1)
46 def loadSelfNode(self, host, port):
47 c = self.store.cursor()
48 c.execute('select id from self where num = 0;')
50 id = c.fetchone()[0].decode('base64')
53 return Node().init(id, host, port)
55 def saveSelfNode(self):
56 self.store.autocommit = 0
57 c = self.store.cursor()
58 c.execute('delete from self where num = 0;')
59 c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64'))
61 self.store.autocommit = 1
65 self.dumpRoutingTable()
66 reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
79 self.store = sqlite.connect(db=db)
80 self.store.autocommit = 1
83 raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
85 def createNewDB(self, db):
86 self.store = sqlite.connect(db=db)
87 self.store.autocommit = 1
89 create table kv (key text, value text, time timestamp, primary key (key, value));
90 create index kv_key on kv(key);
91 create index kv_timestamp on kv(time);
93 create table nodes (id text primary key, host text, port number);
95 create table self (num number primary key, id text);
97 c = self.store.cursor()
100 def dumpRoutingTable(self):
102 save routing table nodes to the database
104 self.store.autocommit = 0;
105 c = self.store.cursor()
106 c.execute("delete from nodes where id not NULL;")
107 for bucket in self.table.buckets:
108 for node in bucket.l:
109 d = node.senderDict()
110 c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port']))
112 self.store.autocommit = 1;
114 def loadRoutingTable(self):
116 load routing table nodes from database
117 it's usually a good idea to call refreshTable(force=1) after loading the table
119 c = self.store.cursor()
120 c.execute("select * from nodes;")
121 for rec in c.fetchall():
122 n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
123 self.table.insertNode(n, contacted=0)
125 def render(self, request):
127 Override the built in render so we can have access to the request object!
128 note, crequest is probably only valid on the initial call (not after deferred!)
130 self.crequest = request
131 return xmlrpc.XMLRPC.render(self, request)
135 ####### LOCAL INTERFACE - use these methods!
136 def addContact(self, host, port):
138 ping this node and add the contact info to the table on pong!
140 n =Node().init(const.NULL_ID, host, port) # note, we
143 ## this call is async!
144 def findNode(self, id, callback, errback=None):
145 """ returns the contact info for node, or the k closest nodes, from the global table """
146 # get K nodes out of local table/cache, or the node we want
147 nodes = self.table.findNodes(id)
150 d.addCallbacks(callback, errback)
152 d.addCallback(callback)
153 if len(nodes) == 1 and nodes[0].id == id :
156 # create our search state
157 state = FindNode(self, id, d.callback)
158 reactor.callFromThread(state.goWithNodes, nodes)
162 def valueForKey(self, key, callback):
163 """ returns the values found for key in global table
164 callback will be called with a list of values for each peer that returns unique values
165 final callback will be an empty list - probably should change to 'more coming' arg
167 nodes = self.table.findNodes(key)
170 l = self.retrieveValues(key)
172 reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
174 # create our search state
175 state = GetValue(self, key, callback)
176 reactor.callFromThread(state.goWithNodes, nodes, l)
178 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
179 def storeValueForKey(self, key, value, callback=None):
180 """ stores the value for key in the global table, returns immediately, no status
181 in this implementation, peers respond but don't indicate status to storing values
182 a key can have many values
184 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
187 def _storedValueHandler(sender):
189 response=_storedValueHandler
191 for node in nodes[:const.STORE_REDUNDANCY]:
192 def cb(t, table = table, node=node, resp=response):
193 self.table.insertNode(node)
195 if node.id != self.node.id:
196 def default(err, node=node, table=table):
197 table.nodeFailed(node)
198 df = node.storeValue(key, value, self.node.senderDict())
199 df.addCallbacks(cb, default)
200 # this call is asynch
201 self.findNode(key, _storeValueForKey)
204 def insertNode(self, n, contacted=1):
206 insert a node in our local table, pinging oldest contact in bucket, if necessary
208 If all you have is a host/port, then use addContact, which calls this method after
209 receiving the PONG from the remote node. The reason for the seperation is we can't insert
210 a node into the table without it's peer-ID. That means of course the node passed into this
211 method needs to be a properly formed Node object with a valid ID.
213 old = self.table.insertNode(n, contacted=contacted)
214 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
215 # the bucket is full, check to see if old node is still around and if so, replace it
217 ## these are the callbacks used when we ping the oldest node in a bucket
218 def _staleNodeHandler(oldnode=old, newnode = n):
219 """ called if the pinged node never responds """
220 self.table.replaceStaleNode(old, newnode)
222 def _notStaleNodeHandler(sender, old=old):
223 """ called when we get a pong from the old node """
224 args, sender = sender
225 sender = Node().initWithDict(sender)
226 if sender.id == old.id:
227 self.table.justSeenNode(old)
229 df = old.ping(self.node.senderDict())
230 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
232 def sendPing(self, node):
236 df = node.ping(self.node.senderDict())
237 ## these are the callbacks we use when we issue a PING
238 def _pongHandler(args, node=node, table=self.table):
240 if node.id != const.NULL_ID and node.id != sender['id'].decode('base64'):
241 # whoah, got response from different peer than we were expecting
242 self.table.invalidateNode(node)
244 sender['host'] = node.host
245 sender['port'] = node.port
246 n = Node().initWithDict(sender)
249 def _defaultPong(err, node=node, table=self.table):
250 table.nodeFailed(node)
252 df.addCallbacks(_pongHandler,_defaultPong)
254 def findCloseNodes(self, callback=lambda a: None):
256 This does a findNode on the ID one away from our own.
257 This will allow us to populate our table with nodes on our network closest to our own.
258 This is called as soon as we start up with an empty table
260 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
261 self.findNode(id, callback)
263 def refreshTable(self, force=0):
265 force=1 will refresh table regardless of last bucket access time
270 for bucket in self.table.buckets:
271 if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
272 id = newIDInRange(bucket.min, bucket.max)
273 self.findNode(id, callback)
276 def retrieveValues(self, key):
277 s = "select value from kv where key = '%s';" % key.encode('base64')
278 c = self.store.cursor()
288 ##### INCOMING MESSAGE HANDLERS
290 def xmlrpc_ping(self, sender):
292 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
295 ip = self.crequest.getClientIP()
297 n = Node().initWithDict(sender)
298 self.insertNode(n, contacted=0)
299 return (), self.node.senderDict()
301 def xmlrpc_find_node(self, target, sender):
302 nodes = self.table.findNodes(target.decode('base64'))
303 nodes = map(lambda node: node.senderDict(), nodes)
304 ip = self.crequest.getClientIP()
306 n = Node().initWithDict(sender)
307 self.insertNode(n, contacted=0)
308 return nodes, self.node.senderDict()
310 def xmlrpc_store_value(self, key, value, sender):
311 t = "%0.6f" % time.time()
312 s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
313 c = self.store.cursor()
316 except pysqlite_exceptions.IntegrityError, reason:
317 # update last insert time
318 s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
320 ip = self.crequest.getClientIP()
322 n = Node().initWithDict(sender)
323 self.insertNode(n, contacted=0)
324 return (), self.node.senderDict()
326 def xmlrpc_find_value(self, key, sender):
327 ip = self.crequest.getClientIP()
328 key = key.decode('base64')
330 n = Node().initWithDict(sender)
331 self.insertNode(n, contacted=0)
333 l = self.retrieveValues(key)
335 return {'values' : l}, self.node.senderDict()
337 nodes = self.table.findNodes(key)
338 nodes = map(lambda node: node.senderDict(), nodes)
339 return {'nodes' : nodes}, self.node.senderDict()
342 def test_build_net(quiet=0, peers=24, host='localhost', pause=0, startport=2001):
343 from whrandom import randrange
351 print "Building %s peer table." % peers
353 for i in xrange(peers):
354 a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
358 thread.start_new_thread(l[0].app.run, ())
364 print "adding contacts...."
367 n = l[randrange(0, len(l))].node
368 peer.addContact(host, n.port)
369 n = l[randrange(0, len(l))].node
370 peer.addContact(host, n.port)
371 n = l[randrange(0, len(l))].node
372 peer.addContact(host, n.port)
378 print "finding close nodes...."
381 flag = threading.Event()
382 def cb(nodes, f=flag):
384 peer.findCloseNodes(cb)
388 # peer.refreshTable()
391 def test_find_nodes(l, quiet=0):
392 import threading, sys
393 from whrandom import randrange
394 flag = threading.Event()
398 a = l[randrange(0,n)]
399 b = l[randrange(0,n)]
401 def callback(nodes, flag=flag, id = b.node.id):
402 if (len(nodes) >0) and (nodes[0].id == id):
403 print "test_find_nodes PASSED"
405 print "test_find_nodes FAILED"
407 a.findNode(b.node.id, callback)
410 def test_find_value(l, quiet=0):
411 from whrandom import randrange
413 from hash import newID
414 import time, threading, sys
416 fa = threading.Event()
417 fb = threading.Event()
418 fc = threading.Event()
421 a = l[randrange(0,n)]
422 b = l[randrange(0,n)]
423 c = l[randrange(0,n)]
424 d = l[randrange(0,n)]
429 print "inserting value..."
431 a.storeValueForKey(key, value)
438 def __init__(self, flag, value=value):
442 def callback(self, values):
444 if(len(values) == 0):
446 print "find NOT FOUND"
451 if self.val in values:
456 b.valueForKey(key, cb(fa).callback)
458 c.valueForKey(key, cb(fb).callback)
460 d.valueForKey(key, cb(fc).callback)
463 def test_one(host, port, db='/tmp/test'):
465 k = Khashmir(host, port, db)
466 thread.start_new_thread(k.app.run, ())
469 if __name__ == "__main__":
472 if len(sys.argv) > 1:
474 l = test_build_net(peers=n)
476 print "finding nodes..."
479 print "inserting and fetching values..."