## Copyright 2002 Andrew Loewenstern, All Rights Reserved
-from listener import Listener
+from const import reactor
+import time
+
from ktable import KTable, K
-from node import Node
-from dispatcher import Dispatcher
-from hash import newID, intify
-import messages
-import transactions
+from knode import KNode as Node
-import time
+from hash import newID
+
+from actions import FindNode, GetValue
+from twisted.web import xmlrpc
+from twisted.internet.defer import Deferred
+from twisted.python import threadable
+from twisted.internet.app import Application
+from twisted.web import server
+threadable.init()
from bsddb3 import db ## find this at http://pybsddb.sf.net/
from bsddb3._db import DBNotFoundError
# don't ping unless it's been at least this many seconds since we've heard from a peer
MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
-# concurrent FIND_NODE/VALUE requests!
-N = 3
# this is the main class!
-class Khashmir:
- __slots__ = ['listener', 'node', 'table', 'dispatcher', 'tf', 'store']
+class Khashmir(xmlrpc.XMLRPC):
+ __slots__ = ['listener', 'node', 'table', 'store', 'app']
def __init__(self, host, port):
- self.listener = Listener(host, port)
self.node = Node(newID(), host, port)
self.table = KTable(self.node)
- self.dispatcher = Dispatcher(self.listener, messages.BASE, self.node.id)
- self.tf = transactions.TransactionFactory(self.node.id, self.dispatcher)
-
+ self.app = Application("xmlrpc")
+ self.app.listenTCP(port, server.Site(self))
self.store = db.DB()
self.store.open(None, None, db.DB_BTREE)
+
- #### register unsolicited incoming message handlers
- self.dispatcher.registerHandler('ping', self._pingHandler, messages.PING)
-
- self.dispatcher.registerHandler('find node', self._findNodeHandler, messages.FIND_NODE)
+ def render(self, request):
+ """
+ Override the built in render so we can have access to the request object!
+ note, crequest is probably only valid on the initial call (not after deferred!)
+ """
+ self.crequest = request
+ return xmlrpc.XMLRPC.render(self, request)
- self.dispatcher.registerHandler('get value', self._findValueHandler, messages.GET_VALUE)
-
- self.dispatcher.registerHandler('store value', self._storeValueHandler, messages.STORE_VALUE)
-
#######
####### LOCAL INTERFACE - use these methods!
## this call is async!
- def findNode(self, id, callback):
+ def findNode(self, id, callback, errback=None):
""" returns the contact info for node, or the k closest nodes, from the global table """
# get K nodes out of local table/cache, or the node we want
nodes = self.table.findNodes(id)
+ d = Deferred()
+ d.addCallbacks(callback, errback)
if len(nodes) == 1 and nodes[0].id == id :
- # we got it in our table!
- def tcall(t, callback=callback):
- callback(t.extras)
- self.dispatcher.postEvent(tcall, 0, extras=nodes)
+ d.callback(nodes)
else:
# create our search state
- state = FindNode(self, self.dispatcher, id, callback)
- # handle this in our own thread
- self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes)
+ state = FindNode(self, id, d.callback)
+ reactor.callFromThread(state.goWithNodes, nodes)
## also async
""" returns the values found for key in global table """
nodes = self.table.findNodes(key)
# create our search state
- state = GetValue(self, self.dispatcher, key, callback)
- # handle this in our own thread
- self.dispatcher.postEvent(state.goWithNodes, 0, extras=nodes)
+ state = GetValue(self, key, callback)
+ reactor.callFromThread(state.goWithNodes, nodes)
## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now
values are stored in peers on a first-come first-served basis
this will probably change so more than one value can be stored under a key
"""
- def _storeValueForKey(nodes, tf=self.tf, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
+ def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"):
for node in nodes:
if node.id != self.node.id:
- t = tf.StoreValue(node, key, value, response, default)
- t.dispatch()
+ df = node.storeValue(key, value, self.node.senderDict())
+ df.addCallbacks(response, default)
# this call is asynch
self.findNode(key, _storeValueForKey)
"""
insert a node in our local table, pinging oldest contact in bucket, if necessary
- If all you have is a host/port, then use addContact, which calls this function after
+ If all you have is a host/port, then use addContact, which calls this method after
receiving the PONG from the remote node. The reason for the seperation is we can't insert
a node into the table without it's peer-ID. That means of course the node passed into this
method needs to be a properly formed Node object with a valid ID.
old = self.table.insertNode(n)
if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
# the bucket is full, check to see if old node is still around and if so, replace it
- t = self.tf.Ping(old, self._notStaleNodeHandler, self._staleNodeHandler)
- t.newnode = n
- t.dispatch()
+
+ ## these are the callbacks used when we ping the oldest node in a bucket
+ def _staleNodeHandler(oldnode=old, newnode = n):
+ """ called if the pinged node never responds """
+ self.table.replaceStaleNode(old, newnode)
+
+ def _notStaleNodeHandler(sender, old=old):
+ """ called when we get a ping from the remote node """
+ if sender['id'] == old.id:
+ self.table.insertNode(old)
+
+ df = old.ping()
+ df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler)
def sendPing(self, node):
"""
ping a node
"""
- t = self.tf.Ping(node, self._pongHandler, self._defaultPong)
- t.dispatch()
+ df = node.ping(self.node.senderDict())
+ ## these are the callbacks we use when we issue a PING
+ def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
+ if id != 20 * ' ' and id != sender['id']:
+ # whoah, got response from different peer than we were expecting
+ pass
+ else:
+ #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
+ n = Node(sender['id'], host, port)
+ table.insertNode(n)
+ return
+ def _defaultPong(err):
+ # this should probably increment a failed message counter and dump the node if it gets over a threshold
+ return
+
+ df.addCallbacks(_pongHandler,_defaultPong)
def findCloseNodes(self):
#####
- ##### UNSOLICITED INCOMING MESSAGE HANDLERS
-
- def _pingHandler(self, t, msg):
- #print "Got PING from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port)
- self.insertNode(t.target)
- # respond, no callbacks, we don't care if they get it or not
- nt = self.tf.Pong(t)
- nt.dispatch()
-
- def _findNodeHandler(self, t, msg):
- #print "Got FIND_NODES from %s:%s at %s:%s" % (t.target.host, t.target.port, self.node.host, self.node.port)
- nodes = self.table.findNodes(msg['target'])
- # respond, no callbacks, we don't care if they get it or not
- nt = self.tf.GotNodes(t, nodes)
- nt.dispatch()
+ ##### INCOMING MESSAGE HANDLERS
- def _storeValueHandler(self, t, msg):
- if not self.store.has_key(msg['key']):
- self.store.put(msg['key'], msg['value'])
- nt = self.tf.StoredValue(t)
- nt.dispatch()
+ def xmlrpc_ping(self, sender):
+ """
+ takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
+ returns sender dict
+ """
+ ip = self.crequest.getClientIP()
+ n = Node(sender['id'], ip, sender['port'])
+ self.insertNode(n)
+ return self.node.senderDict()
+
+ def xmlrpc_find_node(self, target, sender):
+ nodes = self.table.findNodes(target)
+ nodes = map(lambda node: node.senderDict(), nodes)
+ ip = self.crequest.getClientIP()
+ n = Node(sender['id'], ip, sender['port'])
+ self.insertNode(n)
+ return nodes, self.node.senderDict()
- def _findValueHandler(self, t, msg):
- if self.store.has_key(msg['key']):
- t = self.tf.GotValues(t, [(msg['key'], self.store[msg['key']])])
+ def xmlrpc_store_value(self, key, value, sender):
+ if not self.store.has_key(key):
+ self.store.put(key, value)
+ ip = self.crequest.getClientIP()
+ n = Node(sender['id'], ip, sender['port'])
+ self.insertNode(n)
+ return self.node.senderDict()
+
+ def xmlrpc_find_value(self, key, sender):
+ ip = self.crequest.getClientIP()
+ n = Node(sender['id'], ip, sender['port'])
+ self.insertNode(n)
+ if self.store.has_key(key):
+ return {'values' : self.store[key]}, self.node.senderDict()
else:
- nodes = self.table.findNodes(msg['key'])
- t = self.tf.GotNodes(t, nodes)
- t.dispatch()
-
+ nodes = self.table.findNodes(key)
+ nodes = map(lambda node: node.senderDict(), nodes)
+ return {'nodes' : nodes}, self.node.senderDict()
###
### message response callbacks
# called when we get a response to store value
- def _storedValueHandler(self, t, msg):
- self.table.insertNode(t.target)
+ def _storedValueHandler(self, sender):
+ pass
- ## these are the callbacks used when we ping the oldest node in a bucket
- def _staleNodeHandler(self, t):
- """ called if the pinged node never responds """
- self.table.replaceStaleNode(t.target, t.newnode)
- def _notStaleNodeHandler(self, t, msg):
- """ called when we get a ping from the remote node """
- self.table.insertNode(t.target)
-
-
- ## these are the callbacks we use when we issue a PING
- def _pongHandler(self, t, msg):
- #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
- n = Node(msg['id'], t.addr[0], t.addr[1])
- self.table.insertNode(n)
-
- def _defaultPong(self, t):
- # this should probably increment a failed message counter and dump the node if it gets over a threshold
- print "Never got PONG from %s at %s:%s" % (`t.target.id`, t.target.host, t.target.port)
-
-
-class ActionBase:
- """ base class for some long running asynchronous proccesses like finding nodes or values """
- def __init__(self, table, dispatcher, target, callback):
- self.table = table
- self.dispatcher = dispatcher
- self.target = target
- self.int = intify(target)
- self.found = {}
- self.queried = {}
- self.answered = {}
- self.callback = callback
- self.outstanding = 0
- self.finished = 0
-
- def sort(a, b, int=self.int):
- """ this function is for sorting nodes relative to the ID we are looking for """
- x, y = int ^ a.int, int ^ b.int
- if x > y:
- return 1
- elif x < y:
- return -1
- return 0
- self.sort = sort
-
- def goWithNodes(self, t):
- pass
-
-class FindNode(ActionBase):
- """ find node action merits it's own class as it is a long running stateful process """
- def handleGotNodes(self, t, msg):
- if self.finished or self.answered.has_key(t.id):
- # a day late and a dollar short
- return
- self.outstanding = self.outstanding - 1
- self.answered[t.id] = 1
- for node in msg['nodes']:
- if not self.found.has_key(node['id']):
- n = Node(node['id'], node['host'], node['port'])
- self.found[n.id] = n
- self.table.insertNode(n)
- self.schedule()
-
- def schedule(self):
- """
- send messages to new peers, if necessary
- """
- if self.finished:
- return
- l = self.found.values()
- l.sort(self.sort)
-
- for node in l[:K]:
- if node.id == self.target:
- self.finished=1
- return self.callback([node])
- if not self.queried.has_key(node.id) and node.id != self.table.node.id:
- t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes)
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
- t.timeout = time.time() + 15
- t.dispatch()
- if self.outstanding >= N:
- break
- assert(self.outstanding) >=0
- if self.outstanding == 0:
- ## all done!!
- self.finished=1
- self.callback(l[:K])
-
- def defaultGotNodes(self, t):
- if self.finished:
- return
- self.outstanding = self.outstanding - 1
- self.schedule()
-
-
- def goWithNodes(self, t):
- """
- this starts the process, our argument is a transaction with t.extras being our list of nodes
- it's a transaction since we got called from the dispatcher
- """
- nodes = t.extras
- for node in nodes:
- if node.id == self.table.node.id:
- continue
- self.found[node.id] = node
- t = self.table.tf.FindNode(node, self.target, self.handleGotNodes, self.defaultGotNodes)
- t.timeout = time.time() + 15
- t.dispatch()
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
- if self.outstanding == 0:
- self.callback(nodes)
-
-
-
-class GetValue(FindNode):
- """ get value task """
- def handleGotNodes(self, t, msg):
- if self.finished or self.answered.has_key(t.id):
- # a day late and a dollar short
- return
- self.outstanding = self.outstanding - 1
- self.answered[t.id] = 1
- # go through nodes
- # if we have any closer than what we already got, query them
- if msg['type'] == 'got nodes':
- for node in msg['nodes']:
- if not self.found.has_key(node['id']):
- n = Node(node['id'], node['host'], node['port'])
- self.found[n.id] = n
- self.table.insertNode(n)
- elif msg['type'] == 'got values':
- ## done
- self.finished = 1
- return self.callback(msg['values'])
- self.schedule()
-
- ## get value
- def schedule(self):
- if self.finished:
- return
- l = self.found.values()
- l.sort(self.sort)
-
- for node in l[:K]:
- if not self.queried.has_key(node.id) and node.id != self.table.node.id:
- t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes)
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
- t.timeout = time.time() + 15
- t.dispatch()
- if self.outstanding >= N:
- break
- assert(self.outstanding) >=0
- if self.outstanding == 0:
- ## all done, didn't find it!!
- self.finished=1
- self.callback([])
-
- ## get value
- def goWithNodes(self, t):
- nodes = t.extras
- for node in nodes:
- if node.id == self.table.node.id:
- continue
- self.found[node.id] = node
- t = self.table.tf.GetValue(node, self.target, self.handleGotNodes, self.defaultGotNodes)
- t.timeout = time.time() + 15
- t.dispatch()
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
- if self.outstanding == 0:
- self.callback([])
-
-
-#------
+
+
+#------ testing
+
def test_build_net(quiet=0):
from whrandom import randrange
import thread
port = 2001
l = []
- peers = 100
+ peers = 16
if not quiet:
print "Building %s peer table." % peers
if events == 0:
time.sleep(.25)
- for i in range(10):
- thread.start_new_thread(run, (l[i*10:(i+1)*10],))
- #thread.start_new_thread(l[i].dispatcher.run, ())
-
+ thread.start_new_thread(l[0].app.run, ())
+ for peer in l[1:]:
+ peer.app.run()
+
for peer in l[1:]:
n = l[randrange(0, len(l))].node
peer.addContact(n.host, n.port)
if(len(values) == 0):
print "find FAILED"
else:
- if values[0]['value'] != val:
+ if values != val:
print "find FAILED"
else:
print "find FOUND"
f.set()
return callback
b.valueForKey(key, mc(fa))
- c.valueForKey(key, mc(fb))
- d.valueForKey(key, mc(fc))
-
fa.wait()
+ c.valueForKey(key, mc(fb))
fb.wait()
+ d.valueForKey(key, mc(fc))
fc.wait()
if __name__ == "__main__":
test_find_value(l)
test_find_value(l)
test_find_value(l)
- for i in l:
- i.dispatcher.stop()