""" find node action merits it's own class as it is a long running stateful process """
def handleGotNodes(self, args):
l, sender = args
- if self.finished or self.answered.has_key(sender['id']):
+ sender = Node().initWithDict(sender)
+ if self.finished or self.answered.has_key(sender.id):
# a day late and a dollar short
return
self.outstanding = self.outstanding - 1
- self.answered[sender['id']] = 1
+ self.answered[sender.id] = 1
for node in l:
- if not self.found.has_key(node['id']):
- n = Node(node['id'], node['host'], node['port'])
+ n = Node().initWithDict(node)
+ if not self.found.has_key(n.id):
self.found[n.id] = n
self.table.insertNode(n)
self.schedule()
""" get value task """
def handleGotNodes(self, args):
l, sender = args
- if self.finished or self.answered.has_key(sender['id']):
+ sender = Node().initWithDict(sender)
+ if self.finished or self.answered.has_key(sender.id):
# a day late and a dollar short
return
self.outstanding = self.outstanding - 1
- self.answered[sender['id']] = 1
+ self.answered[sender.id] = 1
# go through nodes
# if we have any closer than what we already got, query them
if l.has_key('nodes'):
for node in l['nodes']:
- if not self.found.has_key(node['id']):
- n = Node(node['id'], node['host'], node['port'])
+ n = Node().initWithDict(node)
+ if not self.found.has_key(n.id):
self.found[n.id] = n
self.table.insertNode(n)
elif l.has_key('values'):
from bsddb3 import db ## find this at http://pybsddb.sf.net/
from bsddb3._db import DBNotFoundError
+from base64 import decodestring as decode
+
# don't ping unless it's been at least this many seconds since we've heard from a peer
MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
class Khashmir(xmlrpc.XMLRPC):
__slots__ = ['listener', 'node', 'table', 'store', 'itime', 'kw', 'app']
def __init__(self, host, port):
- self.node = Node(newID(), host, port)
+ self.node = Node().init(newID(), host, port)
self.table = KTable(self.node)
self.app = Application("xmlrpc")
self.app.listenTCP(port, server.Site(self))
"""
ping this node and add the contact info to the table on pong!
"""
- n =Node(" "*20, host, port) # note, we
+ n =Node().init(" "*20, host, port) # note, we
self.sendPing(n)
def valueForKey(self, key, callback):
""" returns the values found for key in global table """
nodes = self.table.findNodes(key)
+ # decode values, they will be base64 encoded
+ def cbwrap(values, cb=callback):
+ values = map(lambda x: decode(x), values)
+ callback(values)
# create our search state
- state = GetValue(self, key, callback)
+ state = GetValue(self, key, cbwrap)
reactor.callFromThread(state.goWithNodes, nodes)
pass
else:
#print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
- n = Node(sender['id'], host, port)
+ sender['host'] = host
+ sender['port'] = port
+ n = Node().initWithDict(sender)
table.insertNode(n)
return
def _defaultPong(err):
returns sender dict
"""
ip = self.crequest.getClientIP()
- n = Node(sender['id'], ip, sender['port'])
+ sender['host'] = ip
+ n = Node().initWithDict(sender)
self.insertNode(n)
return self.node.senderDict()
nodes = self.table.findNodes(target)
nodes = map(lambda node: node.senderDict(), nodes)
ip = self.crequest.getClientIP()
- n = Node(sender['id'], ip, sender['port'])
+ sender['host'] = ip
+ n = Node().initWithDict(sender)
self.insertNode(n)
return nodes, self.node.senderDict()
def xmlrpc_store_value(self, key, value, sender):
+ key = decode(key)
h1 = sha(key+value).digest()
t = `time.time()`
if not self.store.has_key(h1):
self.itime.put(t, h1)
ip = self.crequest.getClientIP()
- n = Node(sender['id'], ip, sender['port'])
+ sender['host'] = ip
+ n = Node().initWithDict(sender)
self.insertNode(n)
return self.node.senderDict()
def xmlrpc_find_value(self, key, sender):
ip = self.crequest.getClientIP()
- n = Node(sender['id'], ip, sender['port'])
+ key = decode(key)
+ sender['host'] = ip
+ n = Node().initWithDict(sender)
self.insertNode(n)
+
if self.kw.has_key(key):
c = self.kw.cursor()
tup = c.set(key)
#------ testing
-def test_build_net(quiet=0, peers=8, pause=1):
+def test_build_net(quiet=0, peers=24, host='localhost', pause=1):
from whrandom import randrange
import thread
port = 2001
print "Building %s peer table." % peers
for i in xrange(peers):
- a = Khashmir('localhost', port + i)
+ a = Khashmir(host, port + i)
l.append(a)
for peer in l[1:]:
n = l[randrange(0, len(l))].node
- peer.addContact(n.host, n.port)
+ peer.addContact(host, n.port)
n = l[randrange(0, len(l))].node
- peer.addContact(n.host, n.port)
+ peer.addContact(host, n.port)
n = l[randrange(0, len(l))].node
- peer.addContact(n.host, n.port)
+ peer.addContact(host, n.port)
if pause:
time.sleep(.30)
a = l[randrange(0,n)]
b = l[randrange(0,n)]
- def callback(nodes, flag=flag):
- if (len(nodes) >0) and (nodes[0].id == b.node.id):
+ def callback(nodes, flag=flag, id = b.node.id):
+ if (len(nodes) >0) and (nodes[0].id == id):
print "test_find_nodes PASSED"
else:
print "test_find_nodes FAILED"
from twisted.internet.defer import Deferred
from xmlrpcclient import XMLRPCClientFactory as factory
from const import reactor
+from base64 import encodestring as encode
class KNode(Node):
def ping(self, sender):
return df
def storeValue(self, key, value, sender):
df = Deferred()
- f = factory('store_value', (key, value, sender), df.callback, df.errback)
+ f = factory('store_value', (encode(key), encode(value), sender), df.callback, df.errback)
reactor.connectTCP(self.host, self.port, f)
return df
def findValue(self, key, sender):
df = Deferred()
- f = factory('find_value', (key, sender), df.callback, df.errback)
+ f = factory('find_value', (encode(key), sender), df.callback, df.errback)
reactor.connectTCP(self.host, self.port, f)
return df
import hash
import time
from types import *
+from xmlrpclib import Binary
class Node:
"""encapsulate contact info"""
- def __init__(self, id, host, port):
+ def init(self, id, host, port):
self.id = id
self.int = hash.intify(id)
self.host = host
self.port = port
self.lastSeen = time.time()
+ self._senderDict = {'id': Binary(self.id), 'port' : self.port, 'host' : self.host}
+ return self
+
+ def initWithDict(self, dict):
+ self._senderDict = dict
+ self.id = dict['id'].data
+ self.int = hash.intify(self.id)
+ self.port = dict['port']
+ self.host = dict['host']
+ self.lastSeen = time.time()
+ return self
def updateLastSeen(self):
self.lastSeen = time.time()
def senderDict(self):
- return {'id': self.id, 'port' : self.port, 'host' : self.host}
+ return self._senderDict
def __repr__(self):
return `(self.id, self.host, self.port)`