1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
10 from ktable import KTable, K
11 from knode import KNode as Node
13 from hash import newID, newIDInRange
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
23 import sqlite ## find this at http://pysqlite.sourceforge.net/
24 import pysqlite_exceptions
26 KhashmirDBExcept = "KhashmirDBExcept"
28 # this is the main class!
29 class Khashmir(xmlrpc.XMLRPC):
30 __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
31 def __init__(self, host, port, db='khashmir.db'):
32 self.node = Node().init(newID(), host, port)
33 self.table = KTable(self.node)
34 self.app = Application("xmlrpc")
35 self.app.listenTCP(port, server.Site(self))
37 self.last = time.time()
38 KeyExpirer(store=self.store)
51 self.store = sqlite.connect(db=db)
54 raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
56 def createNewDB(self, db):
57 self.store = sqlite.connect(db=db)
59 create table kv (key text, value text, time timestamp, primary key (key, value));
60 create index kv_key on kv(key);
61 create index kv_timestamp on kv(time);
63 create table nodes (id text primary key, host text, port number);
65 c = self.store.cursor()
69 def render(self, request):
71 Override the built in render so we can have access to the request object!
72 note, crequest is probably only valid on the initial call (not after deferred!)
74 self.crequest = request
75 return xmlrpc.XMLRPC.render(self, request)
79 ####### LOCAL INTERFACE - use these methods!
80 def addContact(self, host, port):
82 ping this node and add the contact info to the table on pong!
84 n =Node().init(const.NULL_ID, host, port) # note, we
88 ## this call is async!
89 def findNode(self, id, callback, errback=None):
90 """ returns the contact info for node, or the k closest nodes, from the global table """
91 # get K nodes out of local table/cache, or the node we want
92 nodes = self.table.findNodes(id)
95 d.addCallbacks(callback, errback)
97 d.addCallback(callback)
98 if len(nodes) == 1 and nodes[0].id == id :
101 # create our search state
102 state = FindNode(self, id, d.callback)
103 reactor.callFromThread(state.goWithNodes, nodes)
107 def valueForKey(self, key, callback):
108 """ returns the values found for key in global table
109 callback will be called with a list of values for each peer that returns unique values
110 final callback will be an empty list - probably should change to 'more coming' arg
112 nodes = self.table.findNodes(key)
115 l = self.retrieveValues(key)
117 reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
119 # create our search state
120 state = GetValue(self, key, callback)
121 reactor.callFromThread(state.goWithNodes, nodes, l)
125 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
126 def storeValueForKey(self, key, value, callback=None):
127 """ stores the value for key in the global table, returns immediately, no status
128 in this implementation, peers respond but don't indicate status to storing values
129 a key can have many values
131 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
134 def _storedValueHandler(sender):
136 response=_storedValueHandler
138 for node in nodes[:const.STORE_REDUNDANCY]:
139 def cb(t, table = table, node=node, resp=response):
140 self.table.insertNode(node)
142 if node.id != self.node.id:
143 def default(err, node=node, table=table):
144 table.nodeFailed(node)
145 df = node.storeValue(key, value, self.node.senderDict())
146 df.addCallbacks(cb, lambda x: None)
147 # this call is asynch
148 self.findNode(key, _storeValueForKey)
151 def insertNode(self, n, contacted=1):
153 insert a node in our local table, pinging oldest contact in bucket, if necessary
155 If all you have is a host/port, then use addContact, which calls this method after
156 receiving the PONG from the remote node. The reason for the seperation is we can't insert
157 a node into the table without it's peer-ID. That means of course the node passed into this
158 method needs to be a properly formed Node object with a valid ID.
160 old = self.table.insertNode(n, contacted=contacted)
161 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
162 # the bucket is full, check to see if old node is still around and if so, replace it
164 ## these are the callbacks used when we ping the oldest node in a bucket
165 def _staleNodeHandler(oldnode=old, newnode = n):
166 """ called if the pinged node never responds """
167 self.table.replaceStaleNode(old, newnode)
169 def _notStaleNodeHandler(sender, old=old):
170 """ called when we get a pong from the old node """
171 sender = Node().initWithDict(sender)
172 if sender.id == old.id:
173 self.table.justSeenNode(old)
175 df = old.ping(self.node.senderDict())
176 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
179 def sendPing(self, node):
183 df = node.ping(self.node.senderDict())
184 ## these are the callbacks we use when we issue a PING
185 def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
187 if id != const.NULL_ID and id != sender['id'].decode('base64'):
188 # whoah, got response from different peer than we were expecting
191 sender['host'] = host
192 sender['port'] = port
193 n = Node().initWithDict(sender)
196 def _defaultPong(err, node=node, table=self.table):
197 table.nodeFailed(node)
199 df.addCallbacks(_pongHandler,_defaultPong)
202 def findCloseNodes(self, callback=lambda a: None):
204 This does a findNode on the ID one away from our own.
205 This will allow us to populate our table with nodes on our network closest to our own.
206 This is called as soon as we start up with an empty table
208 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
209 self.findNode(id, callback)
211 def refreshTable(self):
218 for bucket in self.table.buckets:
219 if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
220 id = newIDInRange(bucket.min, bucket.max)
221 self.findNode(id, callback)
224 def retrieveValues(self, key):
225 s = "select value from kv where key = '%s';" % key.encode('base64')
226 c = self.store.cursor()
236 ##### INCOMING MESSAGE HANDLERS
238 def xmlrpc_ping(self, sender):
240 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
243 ip = self.crequest.getClientIP()
245 n = Node().initWithDict(sender)
246 self.insertNode(n, contacted=0)
247 return (), self.node.senderDict()
249 def xmlrpc_find_node(self, target, sender):
250 nodes = self.table.findNodes(target.decode('base64'))
251 nodes = map(lambda node: node.senderDict(), nodes)
252 ip = self.crequest.getClientIP()
254 n = Node().initWithDict(sender)
255 self.insertNode(n, contacted=0)
256 return nodes, self.node.senderDict()
258 def xmlrpc_store_value(self, key, value, sender):
259 t = "%0.6f" % time.time()
260 s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
261 c = self.store.cursor()
264 except pysqlite_exceptions.IntegrityError, reason:
265 # update last insert time
266 s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
269 ip = self.crequest.getClientIP()
271 n = Node().initWithDict(sender)
272 self.insertNode(n, contacted=0)
273 return (), self.node.senderDict()
275 def xmlrpc_find_value(self, key, sender):
276 ip = self.crequest.getClientIP()
277 key = key.decode('base64')
279 n = Node().initWithDict(sender)
280 self.insertNode(n, contacted=0)
282 l = self.retrieveValues(key)
284 return {'values' : l}, self.node.senderDict()
286 nodes = self.table.findNodes(key)
287 nodes = map(lambda node: node.senderDict(), nodes)
288 return {'nodes' : nodes}, self.node.senderDict()
296 def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
297 from whrandom import randrange
304 print "Building %s peer table." % peers
306 for i in xrange(peers):
307 a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
311 thread.start_new_thread(l[0].app.run, ())
317 print "adding contacts...."
320 n = l[randrange(0, len(l))].node
321 peer.addContact(host, n.port)
322 n = l[randrange(0, len(l))].node
323 peer.addContact(host, n.port)
324 n = l[randrange(0, len(l))].node
325 peer.addContact(host, n.port)
330 print "finding close nodes...."
333 flag = threading.Event()
334 def cb(nodes, f=flag):
336 peer.findCloseNodes(cb)
340 # peer.refreshTable()
343 def test_find_nodes(l, quiet=0):
344 import threading, sys
345 from whrandom import randrange
346 flag = threading.Event()
350 a = l[randrange(0,n)]
351 b = l[randrange(0,n)]
353 def callback(nodes, flag=flag, id = b.node.id):
354 if (len(nodes) >0) and (nodes[0].id == id):
355 print "test_find_nodes PASSED"
357 print "test_find_nodes FAILED"
359 a.findNode(b.node.id, callback)
362 def test_find_value(l, quiet=0):
363 from whrandom import randrange
365 from hash import newID
366 import time, threading, sys
368 fa = threading.Event()
369 fb = threading.Event()
370 fc = threading.Event()
373 a = l[randrange(0,n)]
374 b = l[randrange(0,n)]
375 c = l[randrange(0,n)]
376 d = l[randrange(0,n)]
381 print "inserting value..."
383 a.storeValueForKey(key, value)
389 def __init__(self, flag, value=value):
393 def callback(self, values):
395 if(len(values) == 0):
397 print "find NOT FOUND"
403 if self.val in values:
408 b.valueForKey(key, cb(fa).callback)
410 c.valueForKey(key, cb(fb).callback)
412 d.valueForKey(key, cb(fc).callback)
415 def test_one(host, port, db='/tmp/test'):
417 k = Khashmir(host, port, db)
418 thread.start_new_thread(k.app.run, ())
421 if __name__ == "__main__":
424 if len(sys.argv) > 1:
426 l = test_build_net(peers=n)
428 print "finding nodes..."
431 print "inserting and fetching values..."