1 ## Copyright 2002 Andrew Loewenstern, All Rights Reserved
3 from const import reactor
6 from ktable import KTable, K
7 from knode import KNode as Node
9 from hash import newID, intify
11 from twisted.web import xmlrpc
12 from twisted.internet.defer import Deferred
13 from twisted.python import threadable
16 from bsddb3 import db ## find this at http://pybsddb.sf.net/
17 from bsddb3._db import DBNotFoundError
19 # don't ping unless it's been at least this many seconds since we've heard from a peer
20 MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
22 # concurrent FIND_NODE/VALUE requests!
27 # this is the main class!
28 class Khashmir(xmlrpc.XMLRPC):
29 __slots__ = ['listener', 'node', 'table', 'store', 'app']
30 def __init__(self, host, port):
31 self.node = Node(newID(), host, port)
32 self.table = KTable(self.node)
33 from twisted.internet.app import Application
34 from twisted.web import server
35 self.app = Application("xmlrpc")
36 self.app.listenTCP(port, server.Site(self))
38 self.store.open(None, None, db.DB_BTREE)
41 def render(self, request):
43 Override the built in render so we can have access to the request object!
44 note, crequest is probably only valid on the initial call (not after deferred!)
46 self.crequest = request
47 return xmlrpc.XMLRPC.render(self, request)
51 ####### LOCAL INTERFACE - use these methods!
52 def addContact(self, host, port):
54 ping this node and add the contact info to the table on pong!
56 n =Node(" "*20, host, port) # note, we
60 ## this call is async!
61 def findNode(self, id, callback, errback=None):
62 """ returns the contact info for node, or the k closest nodes, from the global table """
63 # get K nodes out of local table/cache, or the node we want
64 nodes = self.table.findNodes(id)
66 d.addCallbacks(callback, errback)
67 if len(nodes) == 1 and nodes[0].id == id :
70 # create our search state
71 state = FindNode(self, id, d.callback)
72 reactor.callFromThread(state.goWithNodes, nodes)
76 def valueForKey(self, key, callback):
77 """ returns the values found for key in global table """
78 nodes = self.table.findNodes(key)
79 # create our search state
80 state = GetValue(self, key, callback)
81 reactor.callFromThread(state.goWithNodes, nodes)
84 ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
85 def storeValueForKey(self, key, value):
86 """ stores the value for key in the global table, returns immediately, no status
87 in this implementation, peers respond but don't indicate status to storing values
88 values are stored in peers on a first-come first-served basis
89 this will probably change so more than one value can be stored under a key
91 def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
93 if node.id != self.node.id:
94 df = node.storeValue(key, value, self.node.senderDict())
95 df.addCallbacks(response, default)
97 self.findNode(key, _storeValueForKey)
100 def insertNode(self, n):
102 insert a node in our local table, pinging oldest contact in bucket, if necessary
104 If all you have is a host/port, then use addContact, which calls this method after
105 receiving the PONG from the remote node. The reason for the seperation is we can't insert
106 a node into the table without it's peer-ID. That means of course the node passed into this
107 method needs to be a properly formed Node object with a valid ID.
109 old = self.table.insertNode(n)
110 if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
111 # the bucket is full, check to see if old node is still around and if so, replace it
113 ## these are the callbacks used when we ping the oldest node in a bucket
114 def _staleNodeHandler(oldnode=old, newnode = n):
115 """ called if the pinged node never responds """
116 self.table.replaceStaleNode(old, newnode)
118 def _notStaleNodeHandler(sender, old=old):
119 """ called when we get a ping from the remote node """
120 if sender['id'] == old.id:
121 self.table.insertNode(old)
124 df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
127 def sendPing(self, node):
131 df = node.ping(self.node.senderDict())
132 ## these are the callbacks we use when we issue a PING
133 def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
134 if id != 20 * ' ' and id != sender['id']:
135 # whoah, got response from different peer than we were expecting
138 #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
139 n = Node(sender['id'], host, port)
142 def _defaultPong(err):
143 # this should probably increment a failed message counter and dump the node if it gets over a threshold
146 df.addCallbacks(_pongHandler,_defaultPong)
149 def findCloseNodes(self):
151 This does a findNode on the ID one away from our own.
152 This will allow us to populate our table with nodes on our network closest to our own.
153 This is called as soon as we start up with an empty table
155 id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
158 self.findNode(id, callback)
160 def refreshTable(self):
167 for bucket in self.table.buckets:
168 if time.time() - bucket.lastAccessed >= 60 * 60:
169 id = randRange(bucket.min, bucket.max)
170 self.findNode(id, callback)
174 ##### INCOMING MESSAGE HANDLERS
176 def xmlrpc_ping(self, sender):
178 takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
181 ip = self.crequest.getClientIP()
182 n = Node(sender['id'], ip, sender['port'])
184 return self.node.senderDict()
186 def xmlrpc_find_node(self, target, sender):
187 nodes = self.table.findNodes(target)
188 nodes = map(lambda node: node.senderDict(), nodes)
189 ip = self.crequest.getClientIP()
190 n = Node(sender['id'], ip, sender['port'])
192 return nodes, self.node.senderDict()
194 def xmlrpc_store_value(self, key, value, sender):
195 if not self.store.has_key(key):
196 self.store.put(key, value)
197 ip = self.crequest.getClientIP()
198 n = Node(sender['id'], ip, sender['port'])
200 return self.node.senderDict()
202 def xmlrpc_find_value(self, key, sender):
203 ip = self.crequest.getClientIP()
204 n = Node(sender['id'], ip, sender['port'])
206 if self.store.has_key(key):
207 return {'values' : self.store[key]}, self.node.senderDict()
209 nodes = self.table.findNodes(msg['key'])
210 nodes = map(lambda node: node.senderDict(), nodes)
211 return {'nodes' : nodes}, self.node.senderDict()
214 ### message response callbacks
215 # called when we get a response to store value
216 def _storedValueHandler(self, sender):
224 """ base class for some long running asynchronous proccesses like finding nodes or values """
225 def __init__(self, table, target, callback):
228 self.int = intify(target)
232 self.callback = callback
236 def sort(a, b, int=self.int):
237 """ this function is for sorting nodes relative to the ID we are looking for """
238 x, y = int ^ a.int, int ^ b.int
246 def goWithNodes(self, t):
251 FIND_NODE_TIMEOUT = 15
253 class FindNode(ActionBase):
254 """ find node action merits it's own class as it is a long running stateful process """
255 def handleGotNodes(self, args):
257 if self.finished or self.answered.has_key(sender['id']):
258 # a day late and a dollar short
260 self.outstanding = self.outstanding - 1
261 self.answered[sender['id']] = 1
263 if not self.found.has_key(node['id']):
264 n = Node(node['id'], node['host'], node['port'])
266 self.table.insertNode(n)
271 send messages to new peers, if necessary
275 l = self.found.values()
279 if node.id == self.target:
281 return self.callback([node])
282 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
283 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
284 df = node.findNode(self.target, self.table.node.senderDict())
285 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
286 self.outstanding = self.outstanding + 1
287 self.queried[node.id] = 1
288 if self.outstanding >= N:
290 assert(self.outstanding) >=0
291 if self.outstanding == 0:
294 reactor.callFromThread(self.callback, l[:K])
296 def defaultGotNodes(self, t):
299 self.outstanding = self.outstanding - 1
303 def goWithNodes(self, nodes):
305 this starts the process, our argument is a transaction with t.extras being our list of nodes
306 it's a transaction since we got called from the dispatcher
309 if node.id == self.table.node.id:
311 self.found[node.id] = node
312 #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
313 df = node.findNode(self.target, self.table.node.senderDict())
314 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
315 self.outstanding = self.outstanding + 1
316 self.queried[node.id] = 1
317 if self.outstanding == 0:
321 GET_VALUE_TIMEOUT = 15
322 class GetValue(FindNode):
323 """ get value task """
324 def handleGotNodes(self, args):
327 if self.finished or self.answered.has_key(sender['id']):
328 # a day late and a dollar short
330 self.outstanding = self.outstanding - 1
331 self.answered[sender['id']] = 1
333 # if we have any closer than what we already got, query them
334 if l.has_key('nodes'):
335 for node in l['nodes']:
336 if not self.found.has_key(node['id']):
337 n = Node(node['id'], node['host'], node['port'])
339 self.table.insertNode(n)
340 elif l.has_key('values'):
343 return self.callback(l['values'])
350 l = self.found.values()
354 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
355 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
356 df = node.getValue(node, self.target)
357 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
358 self.outstanding = self.outstanding + 1
359 self.queried[node.id] = 1
360 if self.outstanding >= N:
362 assert(self.outstanding) >=0
363 if self.outstanding == 0:
364 ## all done, didn't find it!!
366 reactor.callFromThread(self.callback,[])
369 def goWithNodes(self, nodes):
371 if node.id == self.table.node.id:
373 self.found[node.id] = node
374 #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
375 df = node.findNode(self.target, self.table.node.senderDict())
376 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
377 self.outstanding = self.outstanding + 1
378 self.queried[node.id] = 1
379 if self.outstanding == 0:
380 reactor.callFromThread(self.callback, [])
386 def test_build_net(quiet=0):
387 from whrandom import randrange
394 print "Building %s peer table." % peers
396 for i in xrange(peers):
397 a = Khashmir('localhost', port + i)
404 events = events + peer.dispatcher.runOnce()
408 thread.start_new_thread(l[0].app.run, ())
413 n = l[randrange(0, len(l))].node
414 peer.addContact(n.host, n.port)
415 n = l[randrange(0, len(l))].node
416 peer.addContact(n.host, n.port)
417 n = l[randrange(0, len(l))].node
418 peer.addContact(n.host, n.port)
423 peer.findCloseNodes()
429 def test_find_nodes(l, quiet=0):
430 import threading, sys
431 from whrandom import randrange
432 flag = threading.Event()
436 a = l[randrange(0,n)]
437 b = l[randrange(0,n)]
439 def callback(nodes, l=l, flag=flag):
440 if (len(nodes) >0) and (nodes[0].id == b.node.id):
441 print "test_find_nodes PASSED"
443 print "test_find_nodes FAILED"
445 a.findNode(b.node.id, callback)
448 def test_find_value(l, quiet=0):
449 from whrandom import randrange
451 import time, threading, sys
453 fa = threading.Event()
454 fb = threading.Event()
455 fc = threading.Event()
458 a = l[randrange(0,n)]
459 b = l[randrange(0,n)]
460 c = l[randrange(0,n)]
461 d = l[randrange(0,n)]
463 key = sha(`randrange(0,100000)`).digest()
464 value = sha(`randrange(0,100000)`).digest()
466 print "inserting value...",
468 a.storeValueForKey(key, value)
472 def mc(flag, value=value):
473 def callback(values, f=flag, val=value):
475 if(len(values) == 0):
478 if values[0]['value'] != val:
485 b.valueForKey(key, mc(fa))
486 c.valueForKey(key, mc(fb))
487 d.valueForKey(key, mc(fc))
493 if __name__ == "__main__":
496 print "finding nodes..."
500 print "inserting and fetching values..."