1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
7 from bencode import bdecode as loads
8 from bencode import bencode as dumps
12 from ktable import KTable, K
13 from knode import KNode as Node
15 from hash import newID, newIDInRange
17 from actions import FindNode, GetValue, KeyExpirer
18 from twisted.web import xmlrpc
19 from twisted.internet.defer import Deferred
20 from twisted.python import threadable
21 from twisted.internet.app import Application
22 from twisted.web import server
25 import sqlite ## find this at http://pysqlite.sourceforge.net/
28 KhashmirDBExcept = "KhashmirDBExcept"
30 # this is the main class!
31 class Khashmir(xmlrpc.XMLRPC):
32 __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
33 def __init__(self, host, port, db='khashmir.db'):
34 self.node = Node().init(newID(), host, port)
35 self.table = KTable(self.node)
36 self.app = Application("xmlrpc")
37 self.app.listenTCP(port, server.Site(self))
39 self.last = time.time()
40 KeyExpirer(store=self.store)
53 self.store = sqlite.connect(db=db)
56 raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
58 def createNewDB(self, db):
59 self.store = sqlite.connect(db=db)
61 create table kv (hkv text primary key, key text, value text, time timestamp);
62 create index kv_key on kv(key);
64 create table nodes (id text primary key, host text, port number);
66 c = self.store.cursor()
70 def render(self, request):
72 Override the built in render so we can have access to the request object!
73 note, crequest is probably only valid on the initial call (not after deferred!)
75 self.crequest = request
76 return xmlrpc.XMLRPC.render(self, request)
80 ####### LOCAL INTERFACE - use these methods!
81 def addContact(self, host, port):
83 ping this node and add the contact info to the table on pong!
85 n =Node().init(const.NULL_ID, host, port) # note, we
89 ## this call is async!
90 def findNode(self, id, callback, errback=None):
91 """ returns the contact info for node, or the k closest nodes, from the global table """
92 # get K nodes out of local table/cache, or the node we want
93 nodes = self.table.findNodes(id)
96 d.addCallbacks(callback, errback)
98 d.addCallback(callback)
99 if len(nodes) == 1 and nodes[0].id == id :
102 # create our search state
103 state = FindNode(self, id, d.callback)
104 reactor.callFromThread(state.goWithNodes, nodes)
108 def valueForKey(self, key, callback):
109 """ returns the values found for key in global table
110 callback will be called with a list of values for each peer that returns unique values
111 final callback will be an empty list - probably should change to 'more coming' arg
113 nodes = self.table.findNodes(key)
116 l = self.retrieveValues(key)
118 reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
120 # create our search state
121 state = GetValue(self, key, callback)
122 reactor.callFromThread(state.goWithNodes, nodes, l)
126 ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
127 def storeValueForKey(self, key, value, callback=None):
128 """ stores the value for key in the global table, returns immediately, no status
129 in this implementation, peers respond but don't indicate status to storing values
130 a key can have many values
132 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
135 def _storedValueHandler(sender):
137 response=_storedValueHandler
139 for node in nodes[:const.STORE_REDUNDANCY]:
140 def cb(t, table = table, node=node, resp=response):
141 self.table.insertNode(node)
143 if node.id != self.node.id:
144 def default(err, node=node, table=table):
145 table.nodeFailed(node)
146 df = node.storeValue(key, value, self.node.senderDict())
147 df.addCallbacks(cb, lambda x: None)
148 # this call is asynch
149 self.findNode(key, _storeValueForKey)
152 def insertNode(self, n, contacted=1):
154 insert a node in our local table, pinging oldest contact in bucket, if necessary
156 If all you have is a host/port, then use addContact, which calls this method after
157 receiving the PONG from the remote node. The reason for the seperation is we can't insert
158 a node into the table without it's peer-ID. That means of course the node passed into this
159 method needs to be a properly formed Node object with a valid ID.
161 old = self.table.insertNode(n, contacted=contacted)
162 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
163 # the bucket is full, check to see if old node is still around and if so, replace it
165 ## these are the callbacks used when we ping the oldest node in a bucket
166 def _staleNodeHandler(oldnode=old, newnode = n):
167 """ called if the pinged node never responds """
168 self.table.replaceStaleNode(old, newnode)
170 def _notStaleNodeHandler(sender, old=old):
171 """ called when we get a pong from the old node """
172 sender = Node().initWithDict(sender)
173 if sender.id == old.id:
174 self.table.justSeenNode(old)
176 df = old.ping(self.node.senderDict())
177 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
180 def sendPing(self, node):
184 df = node.ping(self.node.senderDict())
185 ## these are the callbacks we use when we issue a PING
186 def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
188 if id != const.NULL_ID and id != sender['id'].decode('base64'):
189 # whoah, got response from different peer than we were expecting
192 sender['host'] = host
193 sender['port'] = port
194 n = Node().initWithDict(sender)
197 def _defaultPong(err, node=node, table=self.table):
198 table.nodeFailed(node)
200 df.addCallbacks(_pongHandler,_defaultPong)
203 def findCloseNodes(self):
205 This does a findNode on the ID one away from our own.
206 This will allow us to populate our table with nodes on our network closest to our own.
207 This is called as soon as we start up with an empty table
209 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
212 self.findNode(id, callback)
214 def refreshTable(self):
221 for bucket in self.table.buckets:
222 if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
223 id = newIDInRange(bucket.min, bucket.max)
224 self.findNode(id, callback)
227 def retrieveValues(self, key):
228 s = "select value from kv where key = '%s';" % key.encode('base64')
229 c = self.store.cursor()
239 ##### INCOMING MESSAGE HANDLERS
241 def xmlrpc_ping(self, sender):
243 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
246 ip = self.crequest.getClientIP()
248 n = Node().initWithDict(sender)
249 self.insertNode(n, contacted=0)
250 return (), self.node.senderDict()
252 def xmlrpc_find_node(self, target, sender):
253 nodes = self.table.findNodes(target.decode('base64'))
254 nodes = map(lambda node: node.senderDict(), nodes)
255 ip = self.crequest.getClientIP()
257 n = Node().initWithDict(sender)
258 self.insertNode(n, contacted=0)
259 return nodes, self.node.senderDict()
261 def xmlrpc_store_value(self, key, value, sender):
262 h1 = sha(key+value).digest().encode('base64')
264 s = "insert into kv values ('%s', '%s', '%s', '%s')" % (h1, key, value, t)
265 c = self.store.cursor()
269 # update last insert time
270 s = "update kv set time = '%s' where hkv = '%s'" % (t, h1)
273 ip = self.crequest.getClientIP()
275 n = Node().initWithDict(sender)
276 self.insertNode(n, contacted=0)
277 return (), self.node.senderDict()
279 def xmlrpc_find_value(self, key, sender):
280 ip = self.crequest.getClientIP()
281 key = key.decode('base64')
283 n = Node().initWithDict(sender)
284 self.insertNode(n, contacted=0)
286 l = self.retrieveValues(key)
288 return {'values' : l}, self.node.senderDict()
290 nodes = self.table.findNodes(key)
291 nodes = map(lambda node: node.senderDict(), nodes)
292 return {'nodes' : nodes}, self.node.senderDict()
300 def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
301 from whrandom import randrange
307 print "Building %s peer table." % peers
309 for i in xrange(peers):
310 a = Khashmir(host, port + i, db = '/tmp/test'+`i`)
314 thread.start_new_thread(l[0].app.run, ())
320 print "adding contacts...."
323 n = l[randrange(0, len(l))].node
324 peer.addContact(host, n.port)
325 n = l[randrange(0, len(l))].node
326 peer.addContact(host, n.port)
327 n = l[randrange(0, len(l))].node
328 peer.addContact(host, n.port)
333 print "finding close nodes...."
336 peer.findCloseNodes()
342 # peer.refreshTable()
345 def test_find_nodes(l, quiet=0):
346 import threading, sys
347 from whrandom import randrange
348 flag = threading.Event()
352 a = l[randrange(0,n)]
353 b = l[randrange(0,n)]
355 def callback(nodes, flag=flag, id = b.node.id):
356 if (len(nodes) >0) and (nodes[0].id == id):
357 print "test_find_nodes PASSED"
359 print "test_find_nodes FAILED"
361 a.findNode(b.node.id, callback)
364 def test_find_value(l, quiet=0):
365 from whrandom import randrange
367 from hash import newID
368 import time, threading, sys
370 fa = threading.Event()
371 fb = threading.Event()
372 fc = threading.Event()
375 a = l[randrange(0,n)]
376 b = l[randrange(0,n)]
377 c = l[randrange(0,n)]
378 d = l[randrange(0,n)]
383 print "inserting value..."
385 a.storeValueForKey(key, value)
391 def __init__(self, flag, value=value):
395 def callback(self, values):
397 if(len(values) == 0):
399 print "find NOT FOUND"
405 if self.val in values:
410 b.valueForKey(key, cb(fa).callback)
412 c.valueForKey(key, cb(fb).callback)
414 d.valueForKey(key, cb(fc).callback)
417 def test_one(host, port, db='/tmp/test'):
419 k = Khashmir(host, port, db)
420 thread.start_new_thread(k.app.run, ())
423 if __name__ == "__main__":
426 if len(sys.argv) > 1:
428 l = test_build_net(peers=n)
430 print "finding nodes..."
433 print "inserting and fetching values..."