1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
7 from bencode import bdecode as loads
8 from bencode import bencode as dumps
12 from ktable import KTable, K
13 from knode import KNode as Node
15 from hash import newID, newIDInRange
17 from actions import FindNode, GetValue, KeyExpirer
18 from twisted.web import xmlrpc
19 from twisted.internet.defer import Deferred
20 from twisted.python import threadable
21 from twisted.internet.app import Application
22 from twisted.web import server
25 from bsddb3 import db ## find this at http://pybsddb.sf.net/
26 from bsddb3._db import DBNotFoundError
28 from xmlrpclib import Binary
32 # this is the main class!
33 class Khashmir(xmlrpc.XMLRPC):
34 __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
35 def __init__(self, host, port):
36 self.node = Node().init(newID(), host, port)
37 self.table = KTable(self.node)
38 self.app = Application("xmlrpc")
39 self.app.listenTCP(port, server.Site(self))
41 ## these databases may be more suited to on-disk rather than in-memory
42 # h((key, value)) -> (key, value, time) mappings
44 self.store.open(None, None, db.DB_BTREE)
46 # <insert time> -> h((key, value))
48 self.itime.set_flags(db.DB_DUP)
49 self.itime.open(None, None, db.DB_BTREE)
51 # key -> h((key, value))
53 self.kw.set_flags(db.DB_DUP)
54 self.kw.open(None, None, db.DB_BTREE)
56 KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
58 def render(self, request):
60 Override the built in render so we can have access to the request object!
61 note, crequest is probably only valid on the initial call (not after deferred!)
63 self.crequest = request
64 return xmlrpc.XMLRPC.render(self, request)
68 ####### LOCAL INTERFACE - use these methods!
69 def addContact(self, host, port):
71 ping this node and add the contact info to the table on pong!
73 n =Node().init(" "*20, host, port) # note, we
77 ## this call is async!
78 def findNode(self, id, callback, errback=None):
79 """ returns the contact info for node, or the k closest nodes, from the global table """
80 # get K nodes out of local table/cache, or the node we want
81 nodes = self.table.findNodes(id)
84 d.addCallbacks(callback, errback)
86 d.addCallback(callback)
87 if len(nodes) == 1 and nodes[0].id == id :
90 # create our search state
91 state = FindNode(self, id, d.callback)
92 reactor.callFromThread(state.goWithNodes, nodes)
96 def valueForKey(self, key, callback):
97 """ returns the values found for key in global table """
98 nodes = self.table.findNodes(key)
101 l = self.retrieveValues(key)
103 reactor.callFromThread(callback, l)
105 # create our search state
106 state = GetValue(self, key, callback)
107 reactor.callFromThread(state.goWithNodes, nodes, l)
111 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
112 def storeValueForKey(self, key, value, callback=None):
113 """ stores the value for key in the global table, returns immediately, no status
114 in this implementation, peers respond but don't indicate status to storing values
115 values are stored in peers on a first-come first-served basis
116 this will probably change so more than one value can be stored under a key
118 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
120 # default callback - this will get called for each successful store value
121 def _storedValueHandler(sender):
123 response=_storedValueHandler
126 def cb(t, table = table, node=node, resp=response):
127 self.table.insertNode(node)
129 if node.id != self.node.id:
130 def default(err, node=node, table=table):
131 table.nodeFailed(node)
132 df = node.storeValue(key, value, self.node.senderDict())
133 df.addCallbacks(cb, lambda: None)
134 # this call is asynch
135 self.findNode(key, _storeValueForKey)
138 def insertNode(self, n, contacted=1):
140 insert a node in our local table, pinging oldest contact in bucket, if necessary
142 If all you have is a host/port, then use addContact, which calls this method after
143 receiving the PONG from the remote node. The reason for the seperation is we can't insert
144 a node into the table without it's peer-ID. That means of course the node passed into this
145 method needs to be a properly formed Node object with a valid ID.
147 old = self.table.insertNode(n, contacted=contacted)
148 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
149 # the bucket is full, check to see if old node is still around and if so, replace it
151 ## these are the callbacks used when we ping the oldest node in a bucket
152 def _staleNodeHandler(oldnode=old, newnode = n):
153 """ called if the pinged node never responds """
154 self.table.replaceStaleNode(old, newnode)
156 def _notStaleNodeHandler(sender, old=old):
157 """ called when we get a pong from the old node """
158 sender = Node().initWithDict(sender)
159 if sender.id == old.id:
160 self.table.justSeenNode(old)
162 df = old.ping(self.node.senderDict())
163 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
166 def sendPing(self, node):
170 df = node.ping(self.node.senderDict())
171 ## these are the callbacks we use when we issue a PING
172 def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
173 if id != 20 * ' ' and id != sender['id'].data:
174 # whoah, got response from different peer than we were expecting
177 sender['host'] = host
178 sender['port'] = port
179 n = Node().initWithDict(sender)
182 def _defaultPong(err, node=node, table=self.table):
183 table.nodeFailed(node)
185 df.addCallbacks(_pongHandler,_defaultPong)
188 def findCloseNodes(self):
190 This does a findNode on the ID one away from our own.
191 This will allow us to populate our table with nodes on our network closest to our own.
192 This is called as soon as we start up with an empty table
194 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
197 self.findNode(id, callback)
199 def refreshTable(self):
206 for bucket in self.table.buckets:
207 if time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS:
208 id = newIDInRange(bucket.min, bucket.max)
209 self.findNode(id, callback)
212 def retrieveValues(self, key):
213 if self.kw.has_key(key):
217 while(tup and tup[0] == key):
219 v = loads(self.store[h1])[1]
226 ##### INCOMING MESSAGE HANDLERS
228 def xmlrpc_ping(self, sender):
230 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
233 ip = self.crequest.getClientIP()
235 n = Node().initWithDict(sender)
236 self.insertNode(n, contacted=0)
237 return self.node.senderDict()
239 def xmlrpc_find_node(self, target, sender):
240 nodes = self.table.findNodes(target.data)
241 nodes = map(lambda node: node.senderDict(), nodes)
242 ip = self.crequest.getClientIP()
244 n = Node().initWithDict(sender)
245 self.insertNode(n, contacted=0)
246 return nodes, self.node.senderDict()
248 def xmlrpc_store_value(self, key, value, sender):
250 h1 = sha(key+value.data).digest()
252 if not self.store.has_key(h1):
253 v = dumps((key, value.data, t))
254 self.store.put(h1, v)
255 self.itime.put(t, h1)
258 # update last insert time
259 tup = loads(self.store[h1])
260 self.store[h1] = dumps((tup[0], tup[1], t))
261 self.itime.put(t, h1)
263 ip = self.crequest.getClientIP()
265 n = Node().initWithDict(sender)
266 self.insertNode(n, contacted=0)
267 return self.node.senderDict()
269 def xmlrpc_find_value(self, key, sender):
270 ip = self.crequest.getClientIP()
273 n = Node().initWithDict(sender)
274 self.insertNode(n, contacted=0)
276 l = self.retrieveValues(key)
278 l = map(lambda v: Binary(v), l)
279 return {'values' : l}, self.node.senderDict()
281 nodes = self.table.findNodes(key)
282 nodes = map(lambda node: node.senderDict(), nodes)
283 return {'nodes' : nodes}, self.node.senderDict()
291 def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
292 from whrandom import randrange
298 print "Building %s peer table." % peers
300 for i in xrange(peers):
301 a = Khashmir(host, port + i)
305 thread.start_new_thread(l[0].app.run, ())
311 print "adding contacts...."
314 n = l[randrange(0, len(l))].node
315 peer.addContact(host, n.port)
316 n = l[randrange(0, len(l))].node
317 peer.addContact(host, n.port)
318 n = l[randrange(0, len(l))].node
319 peer.addContact(host, n.port)
324 print "finding close nodes...."
327 peer.findCloseNodes()
333 # peer.refreshTable()
336 def test_find_nodes(l, quiet=0):
337 import threading, sys
338 from whrandom import randrange
339 flag = threading.Event()
343 a = l[randrange(0,n)]
344 b = l[randrange(0,n)]
346 def callback(nodes, flag=flag, id = b.node.id):
347 if (len(nodes) >0) and (nodes[0].id == id):
348 print "test_find_nodes PASSED"
350 print "test_find_nodes FAILED"
352 a.findNode(b.node.id, callback)
355 def test_find_value(l, quiet=0):
356 from whrandom import randrange
358 from hash import newID
359 import time, threading, sys
361 fa = threading.Event()
362 fb = threading.Event()
363 fc = threading.Event()
366 a = l[randrange(0,n)]
367 b = l[randrange(0,n)]
368 c = l[randrange(0,n)]
369 d = l[randrange(0,n)]
374 print "inserting value..."
376 a.storeValueForKey(key, value)
382 def __init__(self, flag, value=value):
386 def callback(self, values):
388 if(len(values) == 0):
390 print "find NOT FOUND"
396 if self.val in values:
401 b.valueForKey(key, cb(fa).callback)
403 c.valueForKey(key, cb(fb).callback)
405 d.valueForKey(key, cb(fc).callback)
408 def test_one(host, port):
410 k = Khashmir(host, port)
411 thread.start_new_thread(k.app.run, ())
414 if __name__ == "__main__":
417 if len(sys.argv) > 1:
419 l = test_build_net(peers=n)
421 print "finding nodes..."
424 print "inserting and fetching values..."