1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
10 from ktable import KTable, K
11 from knode import KNode as Node
13 from hash import newID, newIDInRange
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
23 import sqlite ## find this at http://pysqlite.sourceforge.net/
24 import pysqlite_exceptions
26 KhashmirDBExcept = "KhashmirDBExcept"
28 # this is the main class!
29 class Khashmir(xmlrpc.XMLRPC):
30 __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
31 def __init__(self, host, port, db='khashmir.db'):
32 self.setup(host, port, db)
33 def setup(self, host, port, db='khashmir.db'):
34 self.node = Node().init(newID(), host, port)
35 self.table = KTable(self.node)
36 self.app = Application("xmlrpc")
37 self.app.listenTCP(port, server.Site(self))
39 self.last = time.time()
40 KeyExpirer(store=self.store)
53 self.store = sqlite.connect(db=db)
54 self.store.autocommit = 1
57 raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
59 def createNewDB(self, db):
60 self.store = sqlite.connect(db=db)
61 self.store.autocommit = 1
63 create table kv (key text, value text, time timestamp, primary key (key, value));
64 create index kv_key on kv(key);
65 create index kv_timestamp on kv(time);
67 create table nodes (id text primary key, host text, port number);
69 c = self.store.cursor()
72 def render(self, request):
74 Override the built in render so we can have access to the request object!
75 note, crequest is probably only valid on the initial call (not after deferred!)
77 self.crequest = request
78 return xmlrpc.XMLRPC.render(self, request)
82 ####### LOCAL INTERFACE - use these methods!
83 def addContact(self, host, port):
85 ping this node and add the contact info to the table on pong!
87 n =Node().init(const.NULL_ID, host, port) # note, we
90 ## this call is async!
91 def findNode(self, id, callback, errback=None):
92 """ returns the contact info for node, or the k closest nodes, from the global table """
93 # get K nodes out of local table/cache, or the node we want
94 nodes = self.table.findNodes(id)
97 d.addCallbacks(callback, errback)
99 d.addCallback(callback)
100 if len(nodes) == 1 and nodes[0].id == id :
103 # create our search state
104 state = FindNode(self, id, d.callback)
105 reactor.callFromThread(state.goWithNodes, nodes)
109 def valueForKey(self, key, callback):
110 """ returns the values found for key in global table
111 callback will be called with a list of values for each peer that returns unique values
112 final callback will be an empty list - probably should change to 'more coming' arg
114 nodes = self.table.findNodes(key)
117 l = self.retrieveValues(key)
119 reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
121 # create our search state
122 state = GetValue(self, key, callback)
123 reactor.callFromThread(state.goWithNodes, nodes, l)
125 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
126 def storeValueForKey(self, key, value, callback=None):
127 """ stores the value for key in the global table, returns immediately, no status
128 in this implementation, peers respond but don't indicate status to storing values
129 a key can have many values
131 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
134 def _storedValueHandler(sender):
136 response=_storedValueHandler
138 for node in nodes[:const.STORE_REDUNDANCY]:
139 def cb(t, table = table, node=node, resp=response):
140 self.table.insertNode(node)
142 if node.id != self.node.id:
143 def default(err, node=node, table=table):
144 table.nodeFailed(node)
145 df = node.storeValue(key, value, self.node.senderDict())
146 df.addCallbacks(cb, default)
147 # this call is asynch
148 self.findNode(key, _storeValueForKey)
151 def insertNode(self, n, contacted=1):
153 insert a node in our local table, pinging oldest contact in bucket, if necessary
155 If all you have is a host/port, then use addContact, which calls this method after
156 receiving the PONG from the remote node. The reason for the seperation is we can't insert
157 a node into the table without it's peer-ID. That means of course the node passed into this
158 method needs to be a properly formed Node object with a valid ID.
160 old = self.table.insertNode(n, contacted=contacted)
161 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
162 # the bucket is full, check to see if old node is still around and if so, replace it
164 ## these are the callbacks used when we ping the oldest node in a bucket
165 def _staleNodeHandler(oldnode=old, newnode = n):
166 """ called if the pinged node never responds """
167 self.table.replaceStaleNode(old, newnode)
169 def _notStaleNodeHandler(sender, old=old):
170 """ called when we get a pong from the old node """
171 args, sender = sender
172 sender = Node().initWithDict(sender)
173 if sender.id == old.id:
174 self.table.justSeenNode(old)
176 df = old.ping(self.node.senderDict())
177 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
179 def sendPing(self, node):
183 df = node.ping(self.node.senderDict())
184 ## these are the callbacks we use when we issue a PING
185 def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
187 if id != const.NULL_ID and id != sender['id'].decode('base64'):
188 # whoah, got response from different peer than we were expecting
191 sender['host'] = host
192 sender['port'] = port
193 n = Node().initWithDict(sender)
196 def _defaultPong(err, node=node, table=self.table):
197 table.nodeFailed(node)
199 df.addCallbacks(_pongHandler,_defaultPong)
201 def findCloseNodes(self, callback=lambda a: None):
203 This does a findNode on the ID one away from our own.
204 This will allow us to populate our table with nodes on our network closest to our own.
205 This is called as soon as we start up with an empty table
207 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
208 self.findNode(id, callback)
210 def refreshTable(self):
217 for bucket in self.table.buckets:
218 if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
219 id = newIDInRange(bucket.min, bucket.max)
220 self.findNode(id, callback)
223 def retrieveValues(self, key):
224 s = "select value from kv where key = '%s';" % key.encode('base64')
225 c = self.store.cursor()
235 ##### INCOMING MESSAGE HANDLERS
237 def xmlrpc_ping(self, sender):
239 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
242 ip = self.crequest.getClientIP()
244 n = Node().initWithDict(sender)
245 self.insertNode(n, contacted=0)
246 return (), self.node.senderDict()
248 def xmlrpc_find_node(self, target, sender):
249 nodes = self.table.findNodes(target.decode('base64'))
250 nodes = map(lambda node: node.senderDict(), nodes)
251 ip = self.crequest.getClientIP()
253 n = Node().initWithDict(sender)
254 self.insertNode(n, contacted=0)
255 return nodes, self.node.senderDict()
257 def xmlrpc_store_value(self, key, value, sender):
258 t = "%0.6f" % time.time()
259 s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
260 c = self.store.cursor()
263 except pysqlite_exceptions.IntegrityError, reason:
264 # update last insert time
265 s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
267 ip = self.crequest.getClientIP()
269 n = Node().initWithDict(sender)
270 self.insertNode(n, contacted=0)
271 return (), self.node.senderDict()
273 def xmlrpc_find_value(self, key, sender):
274 ip = self.crequest.getClientIP()
275 key = key.decode('base64')
277 n = Node().initWithDict(sender)
278 self.insertNode(n, contacted=0)
280 l = self.retrieveValues(key)
282 return {'values' : l}, self.node.senderDict()
284 nodes = self.table.findNodes(key)
285 nodes = map(lambda node: node.senderDict(), nodes)
286 return {'nodes' : nodes}, self.node.senderDict()
289 def test_build_net(quiet=0, peers=24, host='localhost', pause=0):
290 from whrandom import randrange
298 print "Building %s peer table." % peers
300 for i in xrange(peers):
301 a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
305 thread.start_new_thread(l[0].app.run, ())
311 print "adding contacts...."
314 n = l[randrange(0, len(l))].node
315 peer.addContact(host, n.port)
316 n = l[randrange(0, len(l))].node
317 peer.addContact(host, n.port)
318 n = l[randrange(0, len(l))].node
319 peer.addContact(host, n.port)
325 print "finding close nodes...."
328 flag = threading.Event()
329 def cb(nodes, f=flag):
331 peer.findCloseNodes(cb)
335 # peer.refreshTable()
338 def test_find_nodes(l, quiet=0):
339 import threading, sys
340 from whrandom import randrange
341 flag = threading.Event()
345 a = l[randrange(0,n)]
346 b = l[randrange(0,n)]
348 def callback(nodes, flag=flag, id = b.node.id):
349 if (len(nodes) >0) and (nodes[0].id == id):
350 print "test_find_nodes PASSED"
352 print "test_find_nodes FAILED"
354 a.findNode(b.node.id, callback)
357 def test_find_value(l, quiet=0):
358 from whrandom import randrange
360 from hash import newID
361 import time, threading, sys
363 fa = threading.Event()
364 fb = threading.Event()
365 fc = threading.Event()
368 a = l[randrange(0,n)]
369 b = l[randrange(0,n)]
370 c = l[randrange(0,n)]
371 d = l[randrange(0,n)]
376 print "inserting value..."
378 a.storeValueForKey(key, value)
385 def __init__(self, flag, value=value):
389 def callback(self, values):
391 if(len(values) == 0):
393 print "find NOT FOUND"
398 if self.val in values:
403 b.valueForKey(key, cb(fa).callback)
405 c.valueForKey(key, cb(fb).callback)
407 d.valueForKey(key, cb(fc).callback)
410 def test_one(host, port, db='/tmp/test'):
412 k = Khashmir(host, port, db)
413 thread.start_new_thread(k.app.run, ())
416 if __name__ == "__main__":
419 if len(sys.argv) > 1:
421 l = test_build_net(peers=n)
423 print "finding nodes..."
426 print "inserting and fetching values..."