1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
10 from ktable import KTable, K
11 from knode import KNode as Node
13 from hash import newID, newIDInRange
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
23 import sqlite ## find this at http://pysqlite.sourceforge.net/
24 import pysqlite_exceptions
26 KhashmirDBExcept = "KhashmirDBExcept"
28 # this is the main class!
29 class Khashmir(xmlrpc.XMLRPC):
30 __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
31 def __init__(self, host, port, db='khashmir.db'):
32 self.node = Node().init(newID(), host, port)
33 self.table = KTable(self.node)
34 self.app = Application("xmlrpc")
35 self.app.listenTCP(port, server.Site(self))
37 self.last = time.time()
38 KeyExpirer(store=self.store)
51 self.store = sqlite.connect(db=db)
52 self.store.autocommit = 1
55 raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
57 def createNewDB(self, db):
58 self.store = sqlite.connect(db=db)
59 self.store.autocommit = 1
61 create table kv (key text, value text, time timestamp, primary key (key, value));
62 create index kv_key on kv(key);
63 create index kv_timestamp on kv(time);
65 create table nodes (id text primary key, host text, port number);
67 c = self.store.cursor()
70 def render(self, request):
72 Override the built in render so we can have access to the request object!
73 note, crequest is probably only valid on the initial call (not after deferred!)
75 self.crequest = request
76 return xmlrpc.XMLRPC.render(self, request)
80 ####### LOCAL INTERFACE - use these methods!
81 def addContact(self, host, port):
83 ping this node and add the contact info to the table on pong!
85 n =Node().init(const.NULL_ID, host, port) # note, we
89 ## this call is async!
90 def findNode(self, id, callback, errback=None):
91 """ returns the contact info for node, or the k closest nodes, from the global table """
92 # get K nodes out of local table/cache, or the node we want
93 nodes = self.table.findNodes(id)
96 d.addCallbacks(callback, errback)
98 d.addCallback(callback)
99 if len(nodes) == 1 and nodes[0].id == id :
102 # create our search state
103 state = FindNode(self, id, d.callback)
104 reactor.callFromThread(state.goWithNodes, nodes)
108 def valueForKey(self, key, callback):
109 """ returns the values found for key in global table
110 callback will be called with a list of values for each peer that returns unique values
111 final callback will be an empty list - probably should change to 'more coming' arg
113 nodes = self.table.findNodes(key)
116 l = self.retrieveValues(key)
118 reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
120 # create our search state
121 state = GetValue(self, key, callback)
122 reactor.callFromThread(state.goWithNodes, nodes, l)
126 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
127 def storeValueForKey(self, key, value, callback=None):
128 """ stores the value for key in the global table, returns immediately, no status
129 in this implementation, peers respond but don't indicate status to storing values
130 a key can have many values
132 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
135 def _storedValueHandler(sender):
137 response=_storedValueHandler
139 for node in nodes[:const.STORE_REDUNDANCY]:
140 def cb(t, table = table, node=node, resp=response):
141 self.table.insertNode(node)
143 if node.id != self.node.id:
144 def default(err, node=node, table=table):
145 table.nodeFailed(node)
146 df = node.storeValue(key, value, self.node.senderDict())
147 df.addCallbacks(cb, lambda x: None)
148 # this call is asynch
149 self.findNode(key, _storeValueForKey)
152 def insertNode(self, n, contacted=1):
154 insert a node in our local table, pinging oldest contact in bucket, if necessary
156 If all you have is a host/port, then use addContact, which calls this method after
157 receiving the PONG from the remote node. The reason for the seperation is we can't insert
158 a node into the table without it's peer-ID. That means of course the node passed into this
159 method needs to be a properly formed Node object with a valid ID.
161 old = self.table.insertNode(n, contacted=contacted)
162 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
163 # the bucket is full, check to see if old node is still around and if so, replace it
165 ## these are the callbacks used when we ping the oldest node in a bucket
166 def _staleNodeHandler(oldnode=old, newnode = n):
167 """ called if the pinged node never responds """
168 self.table.replaceStaleNode(old, newnode)
170 def _notStaleNodeHandler(sender, old=old):
171 """ called when we get a pong from the old node """
172 sender = Node().initWithDict(sender)
173 if sender.id == old.id:
174 self.table.justSeenNode(old)
176 df = old.ping(self.node.senderDict())
177 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
180 def sendPing(self, node):
184 df = node.ping(self.node.senderDict())
185 ## these are the callbacks we use when we issue a PING
186 def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
188 if id != const.NULL_ID and id != sender['id'].decode('base64'):
189 # whoah, got response from different peer than we were expecting
192 sender['host'] = host
193 sender['port'] = port
194 n = Node().initWithDict(sender)
197 def _defaultPong(err, node=node, table=self.table):
198 table.nodeFailed(node)
200 df.addCallbacks(_pongHandler,_defaultPong)
203 def findCloseNodes(self, callback=lambda a: None):
205 This does a findNode on the ID one away from our own.
206 This will allow us to populate our table with nodes on our network closest to our own.
207 This is called as soon as we start up with an empty table
209 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
210 self.findNode(id, callback)
212 def refreshTable(self):
219 for bucket in self.table.buckets:
220 if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
221 id = newIDInRange(bucket.min, bucket.max)
222 self.findNode(id, callback)
225 def retrieveValues(self, key):
226 s = "select value from kv where key = '%s';" % key.encode('base64')
227 c = self.store.cursor()
237 ##### INCOMING MESSAGE HANDLERS
239 def xmlrpc_ping(self, sender):
241 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
244 ip = self.crequest.getClientIP()
246 n = Node().initWithDict(sender)
247 self.insertNode(n, contacted=0)
248 return (), self.node.senderDict()
250 def xmlrpc_find_node(self, target, sender):
251 nodes = self.table.findNodes(target.decode('base64'))
252 nodes = map(lambda node: node.senderDict(), nodes)
253 ip = self.crequest.getClientIP()
255 n = Node().initWithDict(sender)
256 self.insertNode(n, contacted=0)
257 return nodes, self.node.senderDict()
259 def xmlrpc_store_value(self, key, value, sender):
260 t = "%0.6f" % time.time()
261 s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
262 c = self.store.cursor()
265 except pysqlite_exceptions.IntegrityError, reason:
266 # update last insert time
267 s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
269 ip = self.crequest.getClientIP()
271 n = Node().initWithDict(sender)
272 self.insertNode(n, contacted=0)
273 return (), self.node.senderDict()
275 def xmlrpc_find_value(self, key, sender):
276 ip = self.crequest.getClientIP()
277 key = key.decode('base64')
279 n = Node().initWithDict(sender)
280 self.insertNode(n, contacted=0)
282 l = self.retrieveValues(key)
284 return {'values' : l}, self.node.senderDict()
286 nodes = self.table.findNodes(key)
287 nodes = map(lambda node: node.senderDict(), nodes)
288 return {'nodes' : nodes}, self.node.senderDict()
296 def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
297 from whrandom import randrange
304 print "Building %s peer table." % peers
306 for i in xrange(peers):
307 a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
311 thread.start_new_thread(l[0].app.run, ())
317 print "adding contacts...."
320 n = l[randrange(0, len(l))].node
321 peer.addContact(host, n.port)
322 n = l[randrange(0, len(l))].node
323 peer.addContact(host, n.port)
324 n = l[randrange(0, len(l))].node
325 peer.addContact(host, n.port)
330 print "finding close nodes...."
333 flag = threading.Event()
334 def cb(nodes, f=flag):
336 peer.findCloseNodes(cb)
340 # peer.refreshTable()
343 def test_find_nodes(l, quiet=0):
344 import threading, sys
345 from whrandom import randrange
346 flag = threading.Event()
350 a = l[randrange(0,n)]
351 b = l[randrange(0,n)]
353 def callback(nodes, flag=flag, id = b.node.id):
354 if (len(nodes) >0) and (nodes[0].id == id):
355 print "test_find_nodes PASSED"
357 print "test_find_nodes FAILED"
359 a.findNode(b.node.id, callback)
362 def test_find_value(l, quiet=0):
363 from whrandom import randrange
365 from hash import newID
366 import time, threading, sys
368 fa = threading.Event()
369 fb = threading.Event()
370 fc = threading.Event()
373 a = l[randrange(0,n)]
374 b = l[randrange(0,n)]
375 c = l[randrange(0,n)]
376 d = l[randrange(0,n)]
381 print "inserting value..."
383 a.storeValueForKey(key, value)
389 def __init__(self, flag, value=value):
393 def callback(self, values):
395 if(len(values) == 0):
397 print "find NOT FOUND"
403 if self.val in values:
408 b.valueForKey(key, cb(fa).callback)
410 c.valueForKey(key, cb(fb).callback)
412 d.valueForKey(key, cb(fc).callback)
415 def test_one(host, port, db='/tmp/test'):
417 k = Khashmir(host, port, db)
418 thread.start_new_thread(k.app.run, ())
421 if __name__ == "__main__":
424 if len(sys.argv) > 1:
426 l = test_build_net(peers=n)
428 print "finding nodes..."
431 print "inserting and fetching values..."