1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
7 from bencode import bdecode as loads
8 from bencode import bencode as dumps
12 from ktable import KTable, K
13 from knode import KNode as Node
15 from hash import newID, newIDInRange
17 from actions import FindNode, GetValue, KeyExpirer
18 from twisted.web import xmlrpc
19 from twisted.internet.defer import Deferred
20 from twisted.python import threadable
21 from twisted.internet.app import Application
22 from twisted.web import server
25 from bsddb3 import db ## find this at http://pybsddb.sf.net/
26 from bsddb3._db import DBNotFoundError
28 from xmlrpclib import Binary
32 # this is the main class!
33 class Khashmir(xmlrpc.XMLRPC):
34 __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
35 def __init__(self, host, port):
36 self.node = Node().init(newID(), host, port)
37 self.table = KTable(self.node)
38 self.app = Application("xmlrpc")
39 self.app.listenTCP(port, server.Site(self))
41 ## these databases may be more suited to on-disk rather than in-memory
42 # h((key, value)) -> (key, value, time) mappings
44 self.store.open(None, None, db.DB_BTREE)
46 # <insert time> -> h((key, value))
48 self.itime.set_flags(db.DB_DUP)
49 self.itime.open(None, None, db.DB_BTREE)
51 # key -> h((key, value))
53 self.kw.set_flags(db.DB_DUP)
54 self.kw.open(None, None, db.DB_BTREE)
56 KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
58 def render(self, request):
60 Override the built in render so we can have access to the request object!
61 note, crequest is probably only valid on the initial call (not after deferred!)
63 self.crequest = request
64 return xmlrpc.XMLRPC.render(self, request)
68 ####### LOCAL INTERFACE - use these methods!
69 def addContact(self, host, port):
71 ping this node and add the contact info to the table on pong!
73 n =Node().init(const.NULL_ID, host, port) # note, we
77 ## this call is async!
78 def findNode(self, id, callback, errback=None):
79 """ returns the contact info for node, or the k closest nodes, from the global table """
80 # get K nodes out of local table/cache, or the node we want
81 nodes = self.table.findNodes(id)
84 d.addCallbacks(callback, errback)
86 d.addCallback(callback)
87 if len(nodes) == 1 and nodes[0].id == id :
90 # create our search state
91 state = FindNode(self, id, d.callback)
92 reactor.callFromThread(state.goWithNodes, nodes)
96 def valueForKey(self, key, callback):
97 """ returns the values found for key in global table """
98 nodes = self.table.findNodes(key)
101 l = self.retrieveValues(key)
103 reactor.callFromThread(callback, l)
105 # create our search state
106 state = GetValue(self, key, callback)
107 reactor.callFromThread(state.goWithNodes, nodes, l)
111 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
112 def storeValueForKey(self, key, value, callback=None):
113 """ stores the value for key in the global table, returns immediately, no status
114 in this implementation, peers respond but don't indicate status to storing values
115 values are stored in peers on a first-come first-served basis
116 this will probably change so more than one value can be stored under a key
118 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
120 # default callback - this will get called for each successful store value
121 def _storedValueHandler(sender):
123 response=_storedValueHandler
125 for node in nodes[:const.STORE_REDUNDANCY]:
126 def cb(t, table = table, node=node, resp=response):
127 self.table.insertNode(node)
129 if node.id != self.node.id:
130 def default(err, node=node, table=table):
131 table.nodeFailed(node)
132 df = node.storeValue(key, value, self.node.senderDict())
133 df.addCallbacks(cb, lambda x: None)
134 # this call is asynch
135 self.findNode(key, _storeValueForKey)
138 def insertNode(self, n, contacted=1):
140 insert a node in our local table, pinging oldest contact in bucket, if necessary
142 If all you have is a host/port, then use addContact, which calls this method after
143 receiving the PONG from the remote node. The reason for the seperation is we can't insert
144 a node into the table without it's peer-ID. That means of course the node passed into this
145 method needs to be a properly formed Node object with a valid ID.
147 old = self.table.insertNode(n, contacted=contacted)
148 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
149 # the bucket is full, check to see if old node is still around and if so, replace it
151 ## these are the callbacks used when we ping the oldest node in a bucket
152 def _staleNodeHandler(oldnode=old, newnode = n):
153 """ called if the pinged node never responds """
154 self.table.replaceStaleNode(old, newnode)
156 def _notStaleNodeHandler(sender, old=old):
157 """ called when we get a pong from the old node """
158 sender = Node().initWithDict(sender)
159 if sender.id == old.id:
160 self.table.justSeenNode(old)
162 df = old.ping(self.node.senderDict())
163 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
166 def sendPing(self, node):
170 df = node.ping(self.node.senderDict())
171 ## these are the callbacks we use when we issue a PING
172 def _pongHandler(args, id=node.id, host=node.host, port=node.port, table=self.table):
174 if id != const.NULL_ID and id != sender['id'].data:
175 # whoah, got response from different peer than we were expecting
178 sender['host'] = host
179 sender['port'] = port
180 n = Node().initWithDict(sender)
183 def _defaultPong(err, node=node, table=self.table):
184 table.nodeFailed(node)
186 df.addCallbacks(_pongHandler,_defaultPong)
189 def findCloseNodes(self):
191 This does a findNode on the ID one away from our own.
192 This will allow us to populate our table with nodes on our network closest to our own.
193 This is called as soon as we start up with an empty table
195 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
198 self.findNode(id, callback)
200 def refreshTable(self):
207 for bucket in self.table.buckets:
208 if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
209 id = newIDInRange(bucket.min, bucket.max)
210 self.findNode(id, callback)
213 def retrieveValues(self, key):
214 if self.kw.has_key(key):
218 while(tup and tup[0] == key):
220 v = loads(self.store[h1])[1]
227 ##### INCOMING MESSAGE HANDLERS
229 def xmlrpc_ping(self, sender):
231 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
234 ip = self.crequest.getClientIP()
236 n = Node().initWithDict(sender)
237 self.insertNode(n, contacted=0)
238 return (), self.node.senderDict()
240 def xmlrpc_find_node(self, target, sender):
241 nodes = self.table.findNodes(target.data)
242 nodes = map(lambda node: node.senderDict(), nodes)
243 ip = self.crequest.getClientIP()
245 n = Node().initWithDict(sender)
246 self.insertNode(n, contacted=0)
247 return nodes, self.node.senderDict()
249 def xmlrpc_store_value(self, key, value, sender):
251 h1 = sha(key+value.data).digest()
253 if not self.store.has_key(h1):
254 v = dumps((key, value.data, t))
255 self.store.put(h1, v)
256 self.itime.put(t, h1)
259 # update last insert time
260 tup = loads(self.store[h1])
261 self.store[h1] = dumps((tup[0], tup[1], t))
262 self.itime.put(t, h1)
264 ip = self.crequest.getClientIP()
266 n = Node().initWithDict(sender)
267 self.insertNode(n, contacted=0)
268 return (), self.node.senderDict()
270 def xmlrpc_find_value(self, key, sender):
271 ip = self.crequest.getClientIP()
274 n = Node().initWithDict(sender)
275 self.insertNode(n, contacted=0)
277 l = self.retrieveValues(key)
279 l = map(lambda v: Binary(v), l)
280 return {'values' : l}, self.node.senderDict()
282 nodes = self.table.findNodes(key)
283 nodes = map(lambda node: node.senderDict(), nodes)
284 return {'nodes' : nodes}, self.node.senderDict()
292 def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
293 from whrandom import randrange
299 print "Building %s peer table." % peers
301 for i in xrange(peers):
302 a = Khashmir(host, port + i)
306 thread.start_new_thread(l[0].app.run, ())
312 print "adding contacts...."
315 n = l[randrange(0, len(l))].node
316 peer.addContact(host, n.port)
317 n = l[randrange(0, len(l))].node
318 peer.addContact(host, n.port)
319 n = l[randrange(0, len(l))].node
320 peer.addContact(host, n.port)
325 print "finding close nodes...."
328 peer.findCloseNodes()
334 # peer.refreshTable()
337 def test_find_nodes(l, quiet=0):
338 import threading, sys
339 from whrandom import randrange
340 flag = threading.Event()
344 a = l[randrange(0,n)]
345 b = l[randrange(0,n)]
347 def callback(nodes, flag=flag, id = b.node.id):
348 if (len(nodes) >0) and (nodes[0].id == id):
349 print "test_find_nodes PASSED"
351 print "test_find_nodes FAILED"
353 a.findNode(b.node.id, callback)
356 def test_find_value(l, quiet=0):
357 from whrandom import randrange
359 from hash import newID
360 import time, threading, sys
362 fa = threading.Event()
363 fb = threading.Event()
364 fc = threading.Event()
367 a = l[randrange(0,n)]
368 b = l[randrange(0,n)]
369 c = l[randrange(0,n)]
370 d = l[randrange(0,n)]
375 print "inserting value..."
377 a.storeValueForKey(key, value)
383 def __init__(self, flag, value=value):
387 def callback(self, values):
389 if(len(values) == 0):
391 print "find NOT FOUND"
397 if self.val in values:
402 b.valueForKey(key, cb(fa).callback)
404 c.valueForKey(key, cb(fb).callback)
406 d.valueForKey(key, cb(fc).callback)
409 def test_one(host, port):
411 k = Khashmir(host, port)
412 thread.start_new_thread(k.app.run, ())
415 if __name__ == "__main__":
418 if len(sys.argv) > 1:
420 l = test_build_net(peers=n)
422 print "finding nodes..."
425 print "inserting and fetching values..."