1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
7 from pickle import loads, dumps
10 from ktable import KTable, K
11 from knode import KNode as Node
13 from hash import newID
15 from actions import FindNode, GetValue, KeyExpirer
16 from twisted.web import xmlrpc
17 from twisted.internet.defer import Deferred
18 from twisted.python import threadable
19 from twisted.internet.app import Application
20 from twisted.web import server
23 from bsddb3 import db ## find this at http://pybsddb.sf.net/
24 from bsddb3._db import DBNotFoundError
26 from xmlrpclib import Binary
30 # this is the main class!
31 class Khashmir(xmlrpc.XMLRPC):
32 __slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
33 def __init__(self, host, port):
34 self.node = Node().init(newID(), host, port)
35 self.table = KTable(self.node)
36 self.app = Application("xmlrpc")
37 self.app.listenTCP(port, server.Site(self))
39 ## these databases may be more suited to on-disk rather than in-memory
40 # h((key, value)) -> (key, value, time) mappings
42 self.store.open(None, None, db.DB_BTREE)
44 # <insert time> -> h((key, value))
46 self.itime.set_flags(db.DB_DUP)
47 self.itime.open(None, None, db.DB_BTREE)
49 # key -> h((key, value))
51 self.kw.set_flags(db.DB_DUP)
52 self.kw.open(None, None, db.DB_BTREE)
54 KeyExpirer(store=self.store, itime=self.itime, kw=self.kw)
56 def render(self, request):
58 Override the built in render so we can have access to the request object!
59 note, crequest is probably only valid on the initial call (not after deferred!)
61 self.crequest = request
62 return xmlrpc.XMLRPC.render(self, request)
66 ####### LOCAL INTERFACE - use these methods!
67 def addContact(self, host, port):
69 ping this node and add the contact info to the table on pong!
71 n =Node().init(" "*20, host, port) # note, we
75 ## this call is async!
76 def findNode(self, id, callback, errback=None):
77 """ returns the contact info for node, or the k closest nodes, from the global table """
78 # get K nodes out of local table/cache, or the node we want
79 nodes = self.table.findNodes(id)
81 d.addCallbacks(callback, errback)
82 if len(nodes) == 1 and nodes[0].id == id :
85 # create our search state
86 state = FindNode(self, id, d.callback)
87 reactor.callFromThread(state.goWithNodes, nodes)
91 def valueForKey(self, key, callback):
92 """ returns the values found for key in global table """
93 nodes = self.table.findNodes(key)
96 l = self.retrieveValues(key)
98 reactor.callFromThread(callback, l)
100 # create our search state
101 state = GetValue(self, key, callback)
102 reactor.callFromThread(state.goWithNodes, nodes, l)
106 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
107 def storeValueForKey(self, key, value, callback=None):
108 """ stores the value for key in the global table, returns immediately, no status
109 in this implementation, peers respond but don't indicate status to storing values
110 values are stored in peers on a first-come first-served basis
111 this will probably change so more than one value can be stored under a key
113 def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
115 # default callback - this will get called for each successful store value
116 def _storedValueHandler(sender):
118 response=_storedValueHandler
121 def cb(t, table = table, node=node, resp=response):
122 self.table.insertNode(node)
124 if node.id != self.node.id:
125 def default(err, node=node, table=table):
126 table.nodeFailed(node)
127 df = node.storeValue(key, value, self.node.senderDict())
129 # this call is asynch
130 self.findNode(key, _storeValueForKey)
133 def insertNode(self, n, contacted=1):
135 insert a node in our local table, pinging oldest contact in bucket, if necessary
137 If all you have is a host/port, then use addContact, which calls this method after
138 receiving the PONG from the remote node. The reason for the seperation is we can't insert
139 a node into the table without it's peer-ID. That means of course the node passed into this
140 method needs to be a properly formed Node object with a valid ID.
142 old = self.table.insertNode(n, contacted=contacted)
143 if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
144 # the bucket is full, check to see if old node is still around and if so, replace it
146 ## these are the callbacks used when we ping the oldest node in a bucket
147 def _staleNodeHandler(oldnode=old, newnode = n):
148 """ called if the pinged node never responds """
149 self.table.replaceStaleNode(old, newnode)
151 def _notStaleNodeHandler(sender, old=old):
152 """ called when we get a pong from the old node """
153 sender, conn = sender
154 sender['host'] = conn['host']
155 sender = Node().initWithDict(sender)
156 if sender.id == old.id:
157 self.table.justSeenNode(old)
159 df = old.ping(self.node.senderDict())
160 df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
163 def sendPing(self, node):
167 df = node.ping(self.node.senderDict())
168 ## these are the callbacks we use when we issue a PING
169 def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
171 if id != 20 * ' ' and id != sender['id'].data:
172 # whoah, got response from different peer than we were expecting
175 sender['host'] = host
176 sender['port'] = port
177 n = Node().initWithDict(sender)
180 def _defaultPong(err, node=node, table=self.table):
181 table.nodeFailed(node)
183 df.addCallbacks(_pongHandler,_defaultPong)
186 def findCloseNodes(self):
188 This does a findNode on the ID one away from our own.
189 This will allow us to populate our table with nodes on our network closest to our own.
190 This is called as soon as we start up with an empty table
192 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
195 self.findNode(id, callback)
197 def refreshTable(self):
204 for bucket in self.table.buckets:
205 if time.time() - bucket.lastAccessed >= 60 * 60:
206 id = randRange(bucket.min, bucket.max)
207 self.findNode(id, callback)
210 def retrieveValues(self, key):
211 if self.kw.has_key(key):
215 while(tup and tup[0] == key):
217 v = loads(self.store[h1])[1]
224 ##### INCOMING MESSAGE HANDLERS
226 def xmlrpc_ping(self, sender):
228 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
231 ip = self.crequest.getClientIP()
233 n = Node().initWithDict(sender)
234 self.insertNode(n, contacted=0)
235 return self.node.senderDict()
237 def xmlrpc_find_node(self, target, sender):
238 nodes = self.table.findNodes(target.data)
239 nodes = map(lambda node: node.senderDict(), nodes)
240 ip = self.crequest.getClientIP()
242 n = Node().initWithDict(sender)
243 self.insertNode(n, contacted=0)
244 return nodes, self.node.senderDict()
246 def xmlrpc_store_value(self, key, value, sender):
248 h1 = sha(key+value.data).digest()
250 if not self.store.has_key(h1):
251 v = dumps((key, value.data, t))
252 self.store.put(h1, v)
253 self.itime.put(t, h1)
256 # update last insert time
257 tup = loads(self.store[h1])
258 self.store[h1] = dumps((tup[0], tup[1], t))
259 self.itime.put(t, h1)
261 ip = self.crequest.getClientIP()
263 n = Node().initWithDict(sender)
264 self.insertNode(n, contacted=0)
265 return self.node.senderDict()
267 def xmlrpc_find_value(self, key, sender):
268 ip = self.crequest.getClientIP()
271 n = Node().initWithDict(sender)
272 self.insertNode(n, contacted=0)
274 l = self.retrieveValues(key)
276 l = map(lambda v: Binary(v), l)
277 return {'values' : l}, self.node.senderDict()
279 nodes = self.table.findNodes(key)
280 nodes = map(lambda node: node.senderDict(), nodes)
281 return {'nodes' : nodes}, self.node.senderDict()
289 def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
290 from whrandom import randrange
296 print "Building %s peer table." % peers
298 for i in xrange(peers):
299 a = Khashmir(host, port + i)
303 thread.start_new_thread(l[0].app.run, ())
309 print "adding contacts...."
312 n = l[randrange(0, len(l))].node
313 peer.addContact(host, n.port)
314 n = l[randrange(0, len(l))].node
315 peer.addContact(host, n.port)
316 n = l[randrange(0, len(l))].node
317 peer.addContact(host, n.port)
322 print "finding close nodes...."
325 peer.findCloseNodes()
331 # peer.refreshTable()
334 def test_find_nodes(l, quiet=0):
335 import threading, sys
336 from whrandom import randrange
337 flag = threading.Event()
341 a = l[randrange(0,n)]
342 b = l[randrange(0,n)]
344 def callback(nodes, flag=flag, id = b.node.id):
345 if (len(nodes) >0) and (nodes[0].id == id):
346 print "test_find_nodes PASSED"
348 print "test_find_nodes FAILED"
350 a.findNode(b.node.id, callback)
353 def test_find_value(l, quiet=0):
354 from whrandom import randrange
356 from hash import newID
357 import time, threading, sys
359 fa = threading.Event()
360 fb = threading.Event()
361 fc = threading.Event()
364 a = l[randrange(0,n)]
365 b = l[randrange(0,n)]
366 c = l[randrange(0,n)]
367 d = l[randrange(0,n)]
372 print "inserting value..."
374 a.storeValueForKey(key, value)
380 def __init__(self, flag, value=value):
384 def callback(self, values):
386 if(len(values) == 0):
388 print "find NOT FOUND"
394 if self.val in values:
399 b.valueForKey(key, cb(fa).callback)
401 c.valueForKey(key, cb(fb).callback)
403 d.valueForKey(key, cb(fc).callback)
406 def test_one(host, port):
408 k = Khashmir(host, port)
409 thread.start_new_thread(k.app.run, ())
412 if __name__ == "__main__":
415 if len(sys.argv) > 1:
417 l = test_build_net(peers=n)
419 print "finding nodes..."
422 print "inserting and fetching values..."