from knode import KNode as Node
from ktable import KTable, K
-# concurrent FIND_NODE/VALUE requests!
-N = 3
-
class ActionBase:
""" base class for some long running asynchronous proccesses like finding nodes or values """
def __init__(self, table, target, callback):
class FindNode(ActionBase):
""" find node action merits it's own class as it is a long running stateful process """
def handleGotNodes(self, args):
+ args, conn = args
l, sender = args
+ sender['host'] = conn['host']
sender = Node().initWithDict(sender)
self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
n = Node().initWithDict(node)
if not self.found.has_key(n.id):
self.found[n.id] = n
- self.table.insertNode(n)
self.schedule()
def schedule(self):
df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
- if self.outstanding >= N:
+ if self.outstanding >= const.CONCURRENT_REQS:
break
assert(self.outstanding) >=0
if self.outstanding == 0:
df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
+ if self.outstanding >= const.CONCURRENT_REQS:
+ break
+
if self.outstanding == 0:
self.callback(nodes)
class GetValue(FindNode):
""" get value task """
def handleGotNodes(self, args):
+ args, conn = args
l, sender = args
+ sender['host'] = conn['host']
sender = Node().initWithDict(sender)
self.table.table.insertNode(sender)
if self.finished or self.answered.has_key(sender.id):
n = Node().initWithDict(node)
if not self.found.has_key(n.id):
self.found[n.id] = n
- self.table.insertNode(n)
elif l.has_key('values'):
def x(y, z=self.results):
y = y.data
df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
- if self.outstanding >= N:
+ if self.outstanding >= const.CONCURRENT_REQS:
break
assert(self.outstanding) >=0
if self.outstanding == 0:
df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
+ if self.outstanding >= const.CONCURRENT_REQS:
+ break
+
if self.outstanding == 0:
reactor.callFromThread(self.callback, [])
## Copyright 2002 Andrew Loewenstern, All Rights Reserved
from const import reactor
+import const
+
import time
from pickle import loads, dumps
from sha import sha
from xmlrpclib import Binary
-# don't ping unless it's been at least this many seconds since we've heard from a peer
-MAX_PING_INTERVAL = 60 * 15 # fifteen minutes
-
# this is the main class!
self.findNode(key, _storeValueForKey)
- def insertNode(self, n):
+ def insertNode(self, n, contacted=1):
"""
insert a node in our local table, pinging oldest contact in bucket, if necessary
a node into the table without it's peer-ID. That means of course the node passed into this
method needs to be a properly formed Node object with a valid ID.
"""
- old = self.table.insertNode(n)
- if old and (time.time() - old.lastSeen) > MAX_PING_INTERVAL and old.id != self.node.id:
+ old = self.table.insertNode(n, contacted=contacted)
+ if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
# the bucket is full, check to see if old node is still around and if so, replace it
## these are the callbacks used when we ping the oldest node in a bucket
def _notStaleNodeHandler(sender, old=old):
""" called when we get a pong from the old node """
+ sender, conn = sender
+ sender['host'] = conn['host']
sender = Node().initWithDict(sender)
if sender.id == old.id:
self.table.justSeenNode(old)
df = node.ping(self.node.senderDict())
## these are the callbacks we use when we issue a PING
def _pongHandler(sender, id=node.id, host=node.host, port=node.port, table=self.table):
+ sender = sender[0]
if id != 20 * ' ' and id != sender['id'].data:
# whoah, got response from different peer than we were expecting
pass
else:
- #print "Got PONG from %s at %s:%s" % (`msg['id']`, t.target.host, t.target.port)
sender['host'] = host
sender['port'] = port
n = Node().initWithDict(sender)
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
- self.insertNode(n)
+ self.insertNode(n, contacted=0)
return self.node.senderDict()
def xmlrpc_find_node(self, target, sender):
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
- self.insertNode(n)
+ self.insertNode(n, contacted=0)
return nodes, self.node.senderDict()
def xmlrpc_store_value(self, key, value, sender):
ip = self.crequest.getClientIP()
sender['host'] = ip
n = Node().initWithDict(sender)
- self.insertNode(n)
+ self.insertNode(n, contacted=0)
return self.node.senderDict()
def xmlrpc_find_value(self, key, sender):
key = key.data
sender['host'] = ip
n = Node().initWithDict(sender)
- self.insertNode(n)
+ self.insertNode(n, contacted=0)
l = self.retrieveValues(key)
if len(l) > 0:
try:
if(len(values) == 0):
if not self.found:
- print "find FAILED"
+ print "find NOT FOUND"
else:
print "find FOUND"
sys.stdout.flush()
d.valueForKey(key, cb(fc).callback)
fc.wait()
-def test_one(port):
+def test_one(host, port):
import thread
- k = Khashmir('localhost', port)
+ k = Khashmir(host, port)
thread.start_new_thread(k.app.run, ())
return k
if __name__ == "__main__":
- l = test_build_net()
+ import sys
+ n = 8
+ if len(sys.argv) > 1:
+ n = int(sys.argv[1])
+ l = test_build_net(peers=n)
time.sleep(3)
print "finding nodes..."
for i in range(10):