from ktable import KTable, K
class ActionBase:
- """ base class for some long running asynchronous proccesses like finding nodes or values """
- def __init__(self, table, target, callback):
- self.table = table
- self.target = target
- self.num = intify(target)
- self.found = {}
- self.queried = {}
- self.answered = {}
- self.callback = callback
- self.outstanding = 0
- self.finished = 0
-
- def sort(a, b, num=self.num):
- """ this function is for sorting nodes relative to the ID we are looking for """
- x, y = num ^ a.num, num ^ b.num
- if x > y:
- return 1
- elif x < y:
- return -1
- return 0
- self.sort = sort
-
- def goWithNodes(self, t):
- pass
-
-
+ """ base class for some long running asynchronous proccesses like finding nodes or values """
+ def __init__(self, table, target, callback):
+ self.table = table
+ self.target = target
+ self.num = intify(target)
+ self.found = {}
+ self.queried = {}
+ self.answered = {}
+ self.callback = callback
+ self.outstanding = 0
+ self.finished = 0
+
+ def sort(a, b, num=self.num):
+ """ this function is for sorting nodes relative to the ID we are looking for """
+ x, y = num ^ a.num, num ^ b.num
+ if x > y:
+ return 1
+ elif x < y:
+ return -1
+ return 0
+ self.sort = sort
+
+ def goWithNodes(self, t):
+ pass
+
+
FIND_NODE_TIMEOUT = 15
class FindNode(ActionBase):
- """ find node action merits it's own class as it is a long running stateful process """
- def handleGotNodes(self, args):
- l, sender = args
- sender = Node().initWithDict(sender)
- self.table.table.insertNode(sender)
- if self.finished or self.answered.has_key(sender.id):
- # a day late and a dollar short
- return
- self.outstanding = self.outstanding - 1
- self.answered[sender.id] = 1
- for node in l:
- n = Node().initWithDict(node)
- if not self.found.has_key(n.id):
- self.found[n.id] = n
- self.schedule()
-
- def schedule(self):
- """
- send messages to new peers, if necessary
- """
- if self.finished:
- return
- l = self.found.values()
- l.sort(self.sort)
-
- for node in l[:K]:
- if node.id == self.target:
- self.finished=1
- return self.callback([node])
- if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
- #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
- df = node.findNode(self.target, self.table.node.senderDict())
- df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
- if self.outstanding >= const.CONCURRENT_REQS:
- break
- assert(self.outstanding) >=0
- if self.outstanding == 0:
- ## all done!!
- self.finished=1
- reactor.callFromThread(self.callback, l[:K])
-
- def makeMsgFailed(self, node):
- def defaultGotNodes(err, self=self, node=node):
- self.table.table.nodeFailed(node)
- self.outstanding = self.outstanding - 1
- self.schedule()
- return defaultGotNodes
-
- def goWithNodes(self, nodes):
- """
- this starts the process, our argument is a transaction with t.extras being our list of nodes
- it's a transaction since we got called from the dispatcher
- """
- for node in nodes:
- if node.id == self.table.node.id:
- continue
- else:
- self.found[node.id] = node
-
- self.schedule()
-
+ """ find node action merits it's own class as it is a long running stateful process """
+ def handleGotNodes(self, dict):
+ l = dict["nodes"]
+ sender = dict["sender"]
+ sender = Node().initWithDict(sender)
+ sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+ self.table.table.insertNode(sender)
+ if self.finished or self.answered.has_key(sender.id):
+ # a day late and a dollar short
+ return
+ self.outstanding = self.outstanding - 1
+ self.answered[sender.id] = 1
+ for node in l:
+ n = Node().initWithDict(node)
+ n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+ if not self.found.has_key(n.id):
+ self.found[n.id] = n
+ self.schedule()
+
+ def schedule(self):
+ """
+ send messages to new peers, if necessary
+ """
+ if self.finished:
+ return
+ l = self.found.values()
+ l.sort(self.sort)
+ for node in l[:K]:
+ if node.id == self.target:
+ self.finished=1
+ return self.callback([node])
+ if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
+ #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
+ df = node.findNode(self.target, self.table.node.senderDict())
+ df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
+ self.outstanding = self.outstanding + 1
+ self.queried[node.id] = 1
+ if self.outstanding >= const.CONCURRENT_REQS:
+ break
+ assert(self.outstanding) >=0
+ if self.outstanding == 0:
+ ## all done!!
+ self.finished=1
+ reactor.callFromThread(self.callback, l[:K])
+
+ def makeMsgFailed(self, node):
+ def defaultGotNodes(err, self=self, node=node):
+ self.table.table.nodeFailed(node)
+ self.outstanding = self.outstanding - 1
+ self.schedule()
+ return defaultGotNodes
+
+ def goWithNodes(self, nodes):
+ """
+ this starts the process, our argument is a transaction with t.extras being our list of nodes
+ it's a transaction since we got called from the dispatcher
+ """
+ for node in nodes:
+ if node.id == self.table.node.id:
+ continue
+ else:
+ self.found[node.id] = node
+
+ self.schedule()
+
GET_VALUE_TIMEOUT = 15
class GetValue(FindNode):
""" get value task """
- def handleGotNodes(self, args):
- l, sender = args
- sender = Node().initWithDict(sender)
- self.table.table.insertNode(sender)
- if self.finished or self.answered.has_key(sender.id):
- # a day late and a dollar short
- return
- self.outstanding = self.outstanding - 1
- self.answered[sender.id] = 1
- # go through nodes
- # if we have any closer than what we already got, query them
- if l.has_key('nodes'):
- for node in l['nodes']:
- n = Node().initWithDict(node)
- if not self.found.has_key(n.id):
- self.found[n.id] = n
- elif l.has_key('values'):
- def x(y, z=self.results):
- y = y.decode('base64')
- if not z.has_key(y):
- z[y] = 1
- return y
- else:
- return None
- v = filter(None, map(x, l['values']))
- if(len(v)):
- reactor.callFromThread(self.callback, v)
- self.schedule()
-
+ def handleGotNodes(self, dict):
+ sender = dict["sender"]
+ sender = Node().initWithDict(sender)
+ sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
+ self.table.table.insertNode(sender)
+ if self.finished or self.answered.has_key(sender.id):
+ # a day late and a dollar short
+ return
+ self.outstanding = self.outstanding - 1
+ self.answered[sender.id] = 1
+ # go through nodes
+ # if we have any closer than what we already got, query them
+ if dict.has_key('nodes'):
+ for node in dict['nodes']:
+ n = Node().initWithDict(node)
+ n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
+ if not self.found.has_key(n.id):
+ self.found[n.id] = n
+ elif dict.has_key('values'):
+ def x(y, z=self.results):
+ if not z.has_key(y):
+ z[y] = 1
+ return y
+ else:
+ return None
+ v = filter(None, map(x, dict['values']))
+ if(len(v)):
+ reactor.callFromThread(self.callback, v)
+ self.schedule()
+
## get value
def schedule(self):
- if self.finished:
- return
- l = self.found.values()
- l.sort(self.sort)
-
- for node in l[:K]:
- if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
- #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
- df = node.findValue(self.target, self.table.node.senderDict())
- df.addCallback(self.handleGotNodes)
- df.addErrback(self.makeMsgFailed(node))
- self.outstanding = self.outstanding + 1
- self.queried[node.id] = 1
- if self.outstanding >= const.CONCURRENT_REQS:
- break
- assert(self.outstanding) >=0
- if self.outstanding == 0:
- ## all done, didn't find it!!
- self.finished=1
- reactor.callFromThread(self.callback,[])
+ if self.finished:
+ return
+ l = self.found.values()
+ l.sort(self.sort)
+
+ for node in l[:K]:
+ if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
+ #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
+ df = node.findValue(self.target, self.table.node.senderDict())
+ df.addCallback(self.handleGotNodes)
+ df.addErrback(self.makeMsgFailed(node))
+ self.outstanding = self.outstanding + 1
+ self.queried[node.id] = 1
+ if self.outstanding >= const.CONCURRENT_REQS:
+ break
+ assert(self.outstanding) >=0
+ if self.outstanding == 0:
+ ## all done, didn't find it!!
+ self.finished=1
+ reactor.callFromThread(self.callback,[])
## get value
def goWithNodes(self, nodes, found=None):
- self.results = {}
- if found:
- for n in found:
- self.results[n] = 1
- for node in nodes:
- if node.id == self.table.node.id:
- continue
- else:
- self.found[node.id] = node
-
- self.schedule()
+ self.results = {}
+ if found:
+ for n in found:
+ self.results[n] = 1
+ for node in nodes:
+ if node.id == self.table.node.id:
+ continue
+ else:
+ self.found[node.id] = node
+
+ self.schedule()
+
+class StoreValue(ActionBase):
+ def __init__(self, table, target, value, callback):
+ ActionBase.__init__(self, table, target, callback)
+ self.value = value
+ self.stored = []
+
+ def storedValue(self, t, node):
+ self.outstanding -= 1
+ self.table.insertNode(node)
+ if self.finished:
+ return
+ self.stored.append(t)
+ if len(self.stored) >= const.STORE_REDUNDANCY:
+ self.finished=1
+ self.callback(self.stored)
+ else:
+ if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
+ self.schedule()
+
+ def storeFailed(self, t, node):
+ self.table.nodeFailed(node)
+ self.outstanding -= 1
+ if self.finished:
+ return
+ self.schedule()
+
+ def schedule(self):
+ if self.finished:
+ return
+ num = const.CONCURRENT_REQS - self.outstanding
+ if num > const.STORE_REDUNDANCY:
+ num = const.STORE_REDUNDANCY
+ for i in range(num):
+ try:
+ node = self.nodes.pop()
+ except IndexError:
+ if self.outstanding == 0:
+ self.finished = 1
+ self.callback(self.stored)
+ else:
+ if not node.id == self.table.node.id:
+ self.outstanding += 1
+ df = node.storeValue(self.target, self.value, self.table.node.senderDict())
+ df.addCallback(self.storedValue, node=node)
+ df.addErrback(self.storeFailed, node=node)
+
+ def goWithNodes(self, nodes):
+ self.nodes = nodes
+ self.nodes.sort(self.sort)
+ self.schedule()
class KeyExpirer:
- def __init__(self, store):
- self.store = store
- reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
-
- def doExpire(self):
- self.cut = "%0.6f" % (time() - const.KE_AGE)
- self._expire()
-
- def _expire(self):
- c = self.store.cursor()
- s = "delete from kv where time < '%s';" % self.cut
- c.execute(s)
- reactor.callLater(const.KE_DELAY, self.doExpire)
-
\ No newline at end of file
+ def __init__(self, store):
+ self.store = store
+ reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
+
+ def doExpire(self):
+ self.cut = "%0.6f" % (time() - const.KE_AGE)
+ self._expire()
+
+ def _expire(self):
+ c = self.store.cursor()
+ s = "delete from kv where time < '%s';" % self.cut
+ c.execute(s)
+ reactor.callLater(const.KE_DELAY, self.doExpire)
+
\ No newline at end of file
self.connections = {}
def datagramReceived(self, datagram, addr):
+ #print `addr`, `datagram`
+ #if addr != self.addr:
self.connectionForAddr(addr).datagramReceived(datagram)
def connectionForAddr(self, addr):
else:
conn = self.connections[addr]
return conn
-
+# def makeConnection(self, transport):
+# protocol.DatagramProtocol.makeConnection(self, transport)
+# tup = transport.getHost()
+# self.addr = (tup[1], tup[2])
+
class AirhookPacket:
def __init__(self, msg):
self.datagram = msg
self.observed = None # their session id
self.sessionID = long(rand(0, 2**32)) # our session id
- self.lastTransmit = -1 # time we last sent a packet with messages
+ self.lastTransmit = 0 # time we last sent a packet with messages
self.lastReceieved = 0 # time we last received a packet with messages
self.lastTransmitSeq = -1 # last sequence we sent a packet
self.state = pending
self.sendSession = None # send session/observed fields until obSeq > sendSession
self.response = 0 # if we know we have a response now (like resending missed packets)
self.noisy = 0
+ self.scheduled = 0 # a sendNext is scheduled, don't schedule another
self.resetMessages()
def resetMessages(self):
self.state = pending
self.resetMessages()
self.inSeq = p.seq
- self.response = 1
elif self.state == confirmed:
if p.session != None or p.observed != None :
- if p.session != self.observed or p.observed != self.sessionID:
+ if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID):
self.state = pending
- if seq == 0:
- self.resetMessages()
- self.inSeq = p.seq
+ self.resetMessages()
+ self.inSeq = p.seq
if self.state != pending:
msgs = []
self.lastTransmit = time()
self.transport.write(packet, self.addr)
+ self.scheduled = 0
self.schedule()
def timeToSend(self):
# any outstanding messages and are we not too far ahead of our counterparty?
- if self.omsgq and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256:
- return 1
+ if len(self.omsgq) > 0 and self.state != sent and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256:
+ return (1, 0)
# do we explicitly need to send a response?
elif self.response:
self.response = 0
- return 1
+ return (1, 0)
# have we not sent anything in a while?
elif time() - self.lastTransmit > 1.0:
- return 1
-
+ return (1, 1)
+ elif self.state == pending:
+ return (1, 1)
+
# nothing to send
- return 0
+ return (0, 0)
def schedule(self):
- if self.timeToSend():
- reactor.callLater(0, self.sendNext)
- else:
- reactor.callLater(1, self.sendNext)
+ tts, t = self.timeToSend()
+ if tts and not self.scheduled:
+ self.scheduled = 1
+ reactor.callLater(t, self.sendNext)
def write(self, data):
# micropackets can only be 255 bytes or less
NULL_ID = 20 * '\0'
# Kademlia "K" constant, this should be an even number
-K = 8
+K = 20
# SHA1 is 160 bits long
HASH_LENGTH = 160
from sha import sha
import whrandom
+random = open('/dev/urandom', 'r') # sucks for windoze
+
def intify(hstr):
"""20 bit hash, big-endian -> long python integer"""
assert len(hstr) == 20
def newID():
"""returns a new pseudorandom globally unique ID string"""
h = sha()
- for i in range(20):
- h.update(chr(whrandom.randrange(0,256)))
+ h.update(random.read(20))
return h.digest()
def newIDInRange(min, max):
def randRange(min, max):
return min + intify(newID()) % (max - min)
-
+
### Test Cases ###
import unittest
class Intify(unittest.TestCase):
known = [('\0' * 20, 0),
- ('\xff' * 20, 2L**160 - 1),
+ ('\xff' * 20, 2L**160 - 1),
]
def testKnown(self):
for str, value in self.known:
from hash import newID, newIDInRange
-from actions import FindNode, GetValue, KeyExpirer
-from twisted.web import xmlrpc
+from actions import FindNode, GetValue, KeyExpirer, StoreValue
+import krpc
+import airhook
+
from twisted.internet.defer import Deferred
+from twisted.internet import protocol
from twisted.python import threadable
from twisted.internet.app import Application
from twisted.web import server
threadable.init()
+import sys
import sqlite ## find this at http://pysqlite.sourceforge.net/
import pysqlite_exceptions
-KhashmirDBExcept = "KhashmirDBExcept"
+class KhashmirDBExcept(Exception):
+ pass
# this is the main class!
-class Khashmir(xmlrpc.XMLRPC):
- __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last')
- def __init__(self, host, port, db='khashmir.db'):
- self.setup(host, port, db)
-
- def setup(self, host, port, db='khashmir.db'):
- self._findDB(db)
- self.node = self._loadSelfNode(host, port)
- self.table = KTable(self.node)
- self._loadRoutingTable()
- self.app = Application("xmlrpc")
- self.app.listenTCP(port, server.Site(self))
- self.last = time.time()
- KeyExpirer(store=self.store)
- #self.refreshTable(force=1)
- reactor.callLater(60, self.checkpoint, (1,))
-
- def _loadSelfNode(self, host, port):
- c = self.store.cursor()
- c.execute('select id from self where num = 0;')
- if c.rowcount > 0:
- id = c.fetchone()[0].decode('base64')
- else:
- id = newID()
- return Node().init(id, host, port)
-
- def _saveSelfNode(self):
- self.store.autocommit = 0
- c = self.store.cursor()
- c.execute('delete from self where num = 0;')
- c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64'))
- self.store.commit()
- self.store.autocommit = 1
-
- def checkpoint(self, auto=0):
- self._saveSelfNode()
- self._dumpRoutingTable()
- if auto:
- reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
-
- def _findDB(self, db):
- import os
- try:
- os.stat(db)
- except OSError:
- self._createNewDB(db)
- else:
- self._loadDB(db)
-
- def _loadDB(self, db):
- try:
- self.store = sqlite.connect(db=db)
- self.store.autocommit = 1
- except:
- import traceback
- raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
-
- def _createNewDB(self, db):
- self.store = sqlite.connect(db=db)
- self.store.autocommit = 1
- s = """
- create table kv (key text, value text, time timestamp, primary key (key, value));
- create index kv_key on kv(key);
- create index kv_timestamp on kv(time);
-
- create table nodes (id text primary key, host text, port number);
-
- create table self (num number primary key, id text);
- """
- c = self.store.cursor()
- c.execute(s)
-
- def _dumpRoutingTable(self):
- """
- save routing table nodes to the database
- """
- self.store.autocommit = 0;
- c = self.store.cursor()
- c.execute("delete from nodes where id not NULL;")
- for bucket in self.table.buckets:
- for node in bucket.l:
- d = node.senderDict()
- c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port']))
- self.store.commit()
- self.store.autocommit = 1;
-
- def _loadRoutingTable(self):
- """
- load routing table nodes from database
- it's usually a good idea to call refreshTable(force=1) after loading the table
- """
- c = self.store.cursor()
- c.execute("select * from nodes;")
- for rec in c.fetchall():
- n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])})
- self.table.insertNode(n, contacted=0)
-
- def render(self, request):
- """
- Override the built in render so we can have access to the request object!
- note, crequest is probably only valid on the initial call (not after deferred!)
- """
- self.crequest = request
- return xmlrpc.XMLRPC.render(self, request)
+class Khashmir(protocol.Factory):
+ __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol')
+ protocol = krpc.KRPC
+ def __init__(self, host, port, db='khashmir.db'):
+ self.setup(host, port, db)
+
+ def setup(self, host, port, db='khashmir.db'):
+ self._findDB(db)
+ self.node = self._loadSelfNode(host, port)
+ self.table = KTable(self.node)
+ self.app = Application("krpc")
+ self.airhook = airhook.listenAirhookStream(port, self)
+ self.last = time.time()
+ self._loadRoutingTable()
+ KeyExpirer(store=self.store)
+ #self.refreshTable(force=1)
+ reactor.callLater(60, self.checkpoint, (1,))
+
+ def _loadSelfNode(self, host, port):
+ c = self.store.cursor()
+ c.execute('select id from self where num = 0;')
+ if c.rowcount > 0:
+ id = c.fetchone()[0].decode('hex')
+ else:
+ id = newID()
+ return Node().init(id, host, port)
+
+ def _saveSelfNode(self):
+ self.store.autocommit = 0
+ c = self.store.cursor()
+ c.execute('delete from self where num = 0;')
+ c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex'))
+ self.store.commit()
+ self.store.autocommit = 1
+
+ def checkpoint(self, auto=0):
+ self._saveSelfNode()
+ self._dumpRoutingTable()
+ if auto:
+ reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint)
+
+ def _findDB(self, db):
+ import os
+ try:
+ os.stat(db)
+ except OSError:
+ self._createNewDB(db)
+ else:
+ self._loadDB(db)
+
+ def _loadDB(self, db):
+ try:
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ except:
+ import traceback
+ raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback
+
+ def _createNewDB(self, db):
+ self.store = sqlite.connect(db=db)
+ self.store.autocommit = 1
+ s = """
+ create table kv (key text, value text, time timestamp, primary key (key, value));
+ create index kv_key on kv(key);
+ create index kv_timestamp on kv(time);
+
+ create table nodes (id text primary key, host text, port number);
+
+ create table self (num number primary key, id text);
+ """
+ c = self.store.cursor()
+ c.execute(s)
+ def _dumpRoutingTable(self):
+ """
+ save routing table nodes to the database
+ """
+ self.store.autocommit = 0;
+ c = self.store.cursor()
+ c.execute("delete from nodes where id not NULL;")
+ for bucket in self.table.buckets:
+ for node in bucket.l:
+ d = node.senderDict()
+ c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port']))
+ self.store.commit()
+ self.store.autocommit = 1;
+
+ def _loadRoutingTable(self):
+ """
+ load routing table nodes from database
+ it's usually a good idea to call refreshTable(force=1) after loading the table
+ """
+ c = self.store.cursor()
+ c.execute("select * from nodes;")
+ for rec in c.fetchall():
+ n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])})
+ n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ self.table.insertNode(n, contacted=0)
+
- #######
- ####### LOCAL INTERFACE - use these methods!
- def addContact(self, host, port):
- """
- ping this node and add the contact info to the table on pong!
- """
- n =Node().init(const.NULL_ID, host, port) # note, we
- self.sendPing(n)
+ #######
+ ####### LOCAL INTERFACE - use these methods!
+ def addContact(self, host, port, callback=None):
+ """
+ ping this node and add the contact info to the table on pong!
+ """
+ n =Node().init(const.NULL_ID, host, port)
+ n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ self.sendPing(n, callback=callback)
- ## this call is async!
- def findNode(self, id, callback, errback=None):
- """ returns the contact info for node, or the k closest nodes, from the global table """
- # get K nodes out of local table/cache, or the node we want
- nodes = self.table.findNodes(id)
- d = Deferred()
- if errback:
- d.addCallbacks(callback, errback)
- else:
- d.addCallback(callback)
- if len(nodes) == 1 and nodes[0].id == id :
- d.callback(nodes)
- else:
- # create our search state
- state = FindNode(self, id, d.callback)
- reactor.callFromThread(state.goWithNodes, nodes)
-
-
- ## also async
- def valueForKey(self, key, callback):
- """ returns the values found for key in global table
- callback will be called with a list of values for each peer that returns unique values
- final callback will be an empty list - probably should change to 'more coming' arg
- """
- nodes = self.table.findNodes(key)
-
- # get locals
- l = self.retrieveValues(key)
- if len(l) > 0:
- reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l))
-
- # create our search state
- state = GetValue(self, key, callback)
- reactor.callFromThread(state.goWithNodes, nodes, l)
+ ## this call is async!
+ def findNode(self, id, callback, errback=None):
+ """ returns the contact info for node, or the k closest nodes, from the global table """
+ # get K nodes out of local table/cache, or the node we want
+ nodes = self.table.findNodes(id)
+ d = Deferred()
+ if errback:
+ d.addCallbacks(callback, errback)
+ else:
+ d.addCallback(callback)
+ if len(nodes) == 1 and nodes[0].id == id :
+ d.callback(nodes)
+ else:
+ # create our search state
+ state = FindNode(self, id, d.callback)
+ reactor.callFromThread(state.goWithNodes, nodes)
+
+
+ ## also async
+ def valueForKey(self, key, callback):
+ """ returns the values found for key in global table
+ callback will be called with a list of values for each peer that returns unique values
+ final callback will be an empty list - probably should change to 'more coming' arg
+ """
+ nodes = self.table.findNodes(key)
+
+ # get locals
+ l = self.retrieveValues(key)
+
+ # create our search state
+ state = GetValue(self, key, callback)
+ reactor.callFromThread(state.goWithNodes, nodes, l)
- ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
- def storeValueForKey(self, key, value, callback=None):
- """ stores the value for key in the global table, returns immediately, no status
- in this implementation, peers respond but don't indicate status to storing values
- a key can have many values
- """
- def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
- if not response:
- # default callback
- def _storedValueHandler(sender):
- pass
- response=_storedValueHandler
-
- for node in nodes[:const.STORE_REDUNDANCY]:
- def cb(t, table = table, node=node, resp=response):
- self.table.insertNode(node)
- response(t)
- if node.id != self.node.id:
- def default(err, node=node, table=table):
- table.nodeFailed(node)
- df = node.storeValue(key, value, self.node.senderDict())
- df.addCallbacks(cb, default)
- # this call is asynch
- self.findNode(key, _storeValueForKey)
-
-
- def insertNode(self, n, contacted=1):
- """
- insert a node in our local table, pinging oldest contact in bucket, if necessary
-
- If all you have is a host/port, then use addContact, which calls this method after
- receiving the PONG from the remote node. The reason for the seperation is we can't insert
- a node into the table without it's peer-ID. That means of course the node passed into this
- method needs to be a properly formed Node object with a valid ID.
- """
- old = self.table.insertNode(n, contacted=contacted)
- if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
- # the bucket is full, check to see if old node is still around and if so, replace it
-
- ## these are the callbacks used when we ping the oldest node in a bucket
- def _staleNodeHandler(oldnode=old, newnode = n):
- """ called if the pinged node never responds """
- self.table.replaceStaleNode(old, newnode)
-
- def _notStaleNodeHandler(sender, old=old):
- """ called when we get a pong from the old node """
- args, sender = sender
- sender = Node().initWithDict(sender)
- if sender.id == old.id:
- self.table.justSeenNode(old)
-
- df = old.ping(self.node.senderDict())
- df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
+ ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor)
+ def storeValueForKey(self, key, value, callback=None):
+ """ stores the value for key in the global table, returns immediately, no status
+ in this implementation, peers respond but don't indicate status to storing values
+ a key can have many values
+ """
+ def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+ if not response:
+ # default callback
+ def _storedValueHandler(sender):
+ pass
+ response=_storedValueHandler
+ action = StoreValue(self.table, key, value, response)
+ reactor.callFromThread(action.goWithNodes, nodes)
+
+ # this call is asynch
+ self.findNode(key, _storeValueForKey)
+
+
+ def insertNode(self, n, contacted=1):
+ """
+ insert a node in our local table, pinging oldest contact in bucket, if necessary
+
+ If all you have is a host/port, then use addContact, which calls this method after
+ receiving the PONG from the remote node. The reason for the seperation is we can't insert
+ a node into the table without it's peer-ID. That means of course the node passed into this
+ method needs to be a properly formed Node object with a valid ID.
+ """
+ old = self.table.insertNode(n, contacted=contacted)
+ if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id:
+ # the bucket is full, check to see if old node is still around and if so, replace it
+
+ ## these are the callbacks used when we ping the oldest node in a bucket
+ def _staleNodeHandler(oldnode=old, newnode = n):
+ """ called if the pinged node never responds """
+ self.table.replaceStaleNode(old, newnode)
+
+ def _notStaleNodeHandler(dict, old=old):
+ """ called when we get a pong from the old node """
+ sender = dict['sender']
+ if sender['id'] == old.id:
+ self.table.justSeenNode(old.id)
+
+ df = old.ping(self.node.senderDict())
+ df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
- def sendPing(self, node):
- """
- ping a node
- """
- df = node.ping(self.node.senderDict())
- ## these are the callbacks we use when we issue a PING
- def _pongHandler(args, node=node, table=self.table):
- l, sender = args
- if node.id != const.NULL_ID and node.id != sender['id'].decode('base64'):
- # whoah, got response from different peer than we were expecting
- self.table.invalidateNode(node)
- else:
- sender['host'] = node.host
- sender['port'] = node.port
- n = Node().initWithDict(sender)
- table.insertNode(n)
- return
- def _defaultPong(err, node=node, table=self.table):
- table.nodeFailed(node)
-
- df.addCallbacks(_pongHandler,_defaultPong)
+ def sendPing(self, node, callback=None):
+ """
+ ping a node
+ """
+ df = node.ping(self.node.senderDict())
+ ## these are the callbacks we use when we issue a PING
+ def _pongHandler(dict, node=node, table=self.table, callback=callback):
+ sender = dict['sender']
+ if node.id != const.NULL_ID and node.id != sender['id']:
+ # whoah, got response from different peer than we were expecting
+ self.table.invalidateNode(node)
+ else:
+ sender['host'] = node.host
+ sender['port'] = node.port
+ n = Node().initWithDict(sender)
+ n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ table.insertNode(n)
+ if callback:
+ callback()
+ def _defaultPong(err, node=node, table=self.table, callback=callback):
+ table.nodeFailed(node)
+ if callback:
+ callback()
+
+ df.addCallbacks(_pongHandler,_defaultPong)
- def findCloseNodes(self, callback=lambda a: None):
- """
- This does a findNode on the ID one away from our own.
- This will allow us to populate our table with nodes on our network closest to our own.
- This is called as soon as we start up with an empty table
- """
- id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
- self.findNode(id, callback)
+ def findCloseNodes(self, callback=lambda a: None):
+ """
+ This does a findNode on the ID one away from our own.
+ This will allow us to populate our table with nodes on our network closest to our own.
+ This is called as soon as we start up with an empty table
+ """
+ id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256)
+ self.findNode(id, callback)
- def refreshTable(self, force=0):
- """
- force=1 will refresh table regardless of last bucket access time
- """
- def callback(nodes):
- pass
-
- for bucket in self.table.buckets:
- if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
- id = newIDInRange(bucket.min, bucket.max)
- self.findNode(id, callback)
+ def refreshTable(self, force=0):
+ """
+ force=1 will refresh table regardless of last bucket access time
+ """
+ def callback(nodes):
+ pass
+
+ for bucket in self.table.buckets:
+ if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS):
+ id = newIDInRange(bucket.min, bucket.max)
+ self.findNode(id, callback)
- def retrieveValues(self, key):
- s = "select value from kv where key = '%s';" % key.encode('base64')
- c = self.store.cursor()
- c.execute(s)
- t = c.fetchone()
- l = []
- while t:
- l.append(t['value'])
- t = c.fetchone()
- return l
-
- #####
- ##### INCOMING MESSAGE HANDLERS
-
- def xmlrpc_ping(self, sender):
- """
- takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
- returns sender dict
- """
- ip = self.crequest.getClientIP()
- sender['host'] = ip
- n = Node().initWithDict(sender)
- self.insertNode(n, contacted=0)
- return (), self.node.senderDict()
-
- def xmlrpc_find_node(self, target, sender):
- nodes = self.table.findNodes(target.decode('base64'))
- nodes = map(lambda node: node.senderDict(), nodes)
- ip = self.crequest.getClientIP()
- sender['host'] = ip
- n = Node().initWithDict(sender)
- self.insertNode(n, contacted=0)
- return nodes, self.node.senderDict()
-
- def xmlrpc_store_value(self, key, value, sender):
- t = "%0.6f" % time.time()
- s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t)
- c = self.store.cursor()
- try:
- c.execute(s)
- except pysqlite_exceptions.IntegrityError, reason:
- # update last insert time
- s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value)
- c.execute(s)
- ip = self.crequest.getClientIP()
- sender['host'] = ip
- n = Node().initWithDict(sender)
- self.insertNode(n, contacted=0)
- return (), self.node.senderDict()
-
- def xmlrpc_find_value(self, key, sender):
- ip = self.crequest.getClientIP()
- key = key.decode('base64')
- sender['host'] = ip
- n = Node().initWithDict(sender)
- self.insertNode(n, contacted=0)
-
- l = self.retrieveValues(key)
- if len(l) > 0:
- return {'values' : l}, self.node.senderDict()
- else:
- nodes = self.table.findNodes(key)
- nodes = map(lambda node: node.senderDict(), nodes)
- return {'nodes' : nodes}, self.node.senderDict()
+ def retrieveValues(self, key):
+ s = "select value from kv where key = '%s';" % key.encode('hex')
+ c = self.store.cursor()
+ c.execute(s)
+ t = c.fetchone()
+ l = []
+ while t:
+ l.append(t['value'].decode('base64'))
+ t = c.fetchone()
+ return l
+
+ #####
+ ##### INCOMING MESSAGE HANDLERS
+
+ def krpc_ping(self, sender, _krpc_sender):
+ """
+ takes sender dict = {'id', <id>, 'port', port} optional keys = 'ip'
+ returns sender dict
+ """
+ sender['host'] = _krpc_sender[0]
+ n = Node().initWithDict(sender)
+ n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ self.insertNode(n, contacted=0)
+ return {"sender" : self.node.senderDict()}
+
+ def krpc_find_node(self, target, sender, _krpc_sender):
+ nodes = self.table.findNodes(target)
+ nodes = map(lambda node: node.senderDict(), nodes)
+ sender['host'] = _krpc_sender[0]
+ n = Node().initWithDict(sender)
+ n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ self.insertNode(n, contacted=0)
+ return {"nodes" : nodes, "sender" : self.node.senderDict()}
+
+ def krpc_store_value(self, key, value, sender, _krpc_sender):
+ t = "%0.6f" % time.time()
+ s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t)
+ c = self.store.cursor()
+ try:
+ c.execute(s)
+ except pysqlite_exceptions.IntegrityError, reason:
+ # update last insert time
+ s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64"))
+ c.execute(s)
+ sender['host'] = _krpc_sender[0]
+ n = Node().initWithDict(sender)
+ n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ self.insertNode(n, contacted=0)
+ return {"sender" : self.node.senderDict()}
+
+ def krpc_find_value(self, key, sender, _krpc_sender):
+ sender['host'] = _krpc_sender[0]
+ n = Node().initWithDict(sender)
+ n.conn = self.airhook.connectionForAddr((n.host, n.port))
+ self.insertNode(n, contacted=0)
+
+ l = self.retrieveValues(key)
+ if len(l) > 0:
+ return {'values' : l, "sender": self.node.senderDict()}
+ else:
+ nodes = self.table.findNodes(key)
+ nodes = map(lambda node: node.senderDict(), nodes)
+ return {'nodes' : nodes, "sender": self.node.senderDict()}
### TESTING ###
from random import randrange
def test_net(peers=24, startport=2001, dbprefix='/tmp/test'):
- import thread
- l = []
- for i in xrange(peers):
- a = Khashmir('localhost', startport + i, db = dbprefix+`i`)
- l.append(a)
- thread.start_new_thread(l[0].app.run, ())
- for peer in l[1:]:
- peer.app.run()
- return l
-
-def test_build_net(quiet=0, peers=24, host='localhost', pause=0, startport=2001, dbprefix='/tmp/test'):
- from whrandom import randrange
- import threading
- import thread
- import sys
- port = startport
- l = []
- if not quiet:
- print "Building %s peer table." % peers
-
- for i in xrange(peers):
- a = Khashmir(host, port + i, db = dbprefix +`i`)
- l.append(a)
-
-
- thread.start_new_thread(l[0].app.run, ())
- time.sleep(1)
- for peer in l[1:]:
- peer.app.run()
- time.sleep(3)
-
- print "adding contacts...."
-
- for peer in l:
- n = l[randrange(0, len(l))].node
- peer.addContact(host, n.port)
- n = l[randrange(0, len(l))].node
- peer.addContact(host, n.port)
- n = l[randrange(0, len(l))].node
- peer.addContact(host, n.port)
- if pause:
- time.sleep(.33)
-
- time.sleep(10)
- print "finding close nodes...."
-
- for peer in l:
- flag = threading.Event()
- def cb(nodes, f=flag):
- f.set()
- peer.findCloseNodes(cb)
- flag.wait()
- # for peer in l:
- # peer.refreshTable()
- return l
+ import thread
+ l = []
+ for i in xrange(peers):
+ a = Khashmir('127.0.0.1', startport + i, db = dbprefix+`i`)
+ l.append(a)
+ thread.start_new_thread(l[0].app.run, ())
+ for peer in l[1:]:
+ peer.app.run()
+ return l
+
+def test_build_net(quiet=0, peers=24, host='127.0.0.1', pause=0, startport=2001, dbprefix='/tmp/test'):
+ from whrandom import randrange
+ import threading
+ import thread
+ import sys
+ port = startport
+ l = []
+ if not quiet:
+ print "Building %s peer table." % peers
+
+ for i in xrange(peers):
+ a = Khashmir(host, port + i, db = dbprefix +`i`)
+ l.append(a)
+
+
+ thread.start_new_thread(l[0].app.run, ())
+ time.sleep(1)
+ for peer in l[1:]:
+ peer.app.run()
+ #time.sleep(3)
+
+ def spewer(frame, s, ignored):
+ from twisted.python import reflect
+ if frame.f_locals.has_key('self'):
+ se = frame.f_locals['self']
+ print 'method %s of %s at %s' % (
+ frame.f_code.co_name, reflect.qual(se.__class__), id(se)
+ )
+ #sys.settrace(spewer)
+
+ print "adding contacts...."
+ def makecb(flag):
+ def cb(f=flag):
+ f.set()
+ return cb
+
+ for peer in l:
+ p = l[randrange(0, len(l))]
+ if p != peer:
+ n = p.node
+ flag = threading.Event()
+ peer.addContact(host, n.port, makecb(flag))
+ flag.wait()
+ p = l[randrange(0, len(l))]
+ if p != peer:
+ n = p.node
+ flag = threading.Event()
+ peer.addContact(host, n.port, makecb(flag))
+ flag.wait()
+ p = l[randrange(0, len(l))]
+ if p != peer:
+ n = p.node
+ flag = threading.Event()
+ peer.addContact(host, n.port, makecb(flag))
+ flag.wait()
+
+ print "finding close nodes...."
+
+ for peer in l:
+ flag = threading.Event()
+ def cb(nodes, f=flag):
+ f.set()
+ peer.findCloseNodes(cb)
+ flag.wait()
+ # for peer in l:
+ # peer.refreshTable()
+ return l
def test_find_nodes(l, quiet=0):
- flag = threading.Event()
-
- n = len(l)
-
- a = l[randrange(0,n)]
- b = l[randrange(0,n)]
-
- def callback(nodes, flag=flag, id = b.node.id):
- if (len(nodes) >0) and (nodes[0].id == id):
- print "test_find_nodes PASSED"
- else:
- print "test_find_nodes FAILED"
- flag.set()
- a.findNode(b.node.id, callback)
- flag.wait()
+ flag = threading.Event()
+
+ n = len(l)
+
+ a = l[randrange(0,n)]
+ b = l[randrange(0,n)]
+
+ def callback(nodes, flag=flag, id = b.node.id):
+ if (len(nodes) >0) and (nodes[0].id == id):
+ print "test_find_nodes PASSED"
+ else:
+ print "test_find_nodes FAILED"
+ flag.set()
+ a.findNode(b.node.id, callback)
+ flag.wait()
def test_find_value(l, quiet=0):
-
- fa = threading.Event()
- fb = threading.Event()
- fc = threading.Event()
-
- n = len(l)
- a = l[randrange(0,n)]
- b = l[randrange(0,n)]
- c = l[randrange(0,n)]
- d = l[randrange(0,n)]
-
- key = newID()
- value = newID()
- if not quiet: print "inserting value..."
- a.storeValueForKey(key, value)
- time.sleep(3)
- if not quiet:
- print "finding..."
-
- class cb:
- def __init__(self, flag, value=value):
- self.flag = flag
- self.val = value
- self.found = 0
- def callback(self, values):
- try:
- if(len(values) == 0):
- if not self.found:
- print "find NOT FOUND"
- else:
- print "find FOUND"
- else:
- if self.val in values:
- self.found = 1
- finally:
- self.flag.set()
-
- b.valueForKey(key, cb(fa).callback)
- fa.wait()
- c.valueForKey(key, cb(fb).callback)
- fb.wait()
- d.valueForKey(key, cb(fc).callback)
- fc.wait()
+
+ fa = threading.Event()
+ fb = threading.Event()
+ fc = threading.Event()
+
+ n = len(l)
+ a = l[randrange(0,n)]
+ b = l[randrange(0,n)]
+ c = l[randrange(0,n)]
+ d = l[randrange(0,n)]
+
+ key = newID()
+ value = newID()
+ if not quiet: print "inserting value..."
+ a.storeValueForKey(key, value)
+ time.sleep(3)
+ if not quiet:
+ print "finding..."
+
+ class cb:
+ def __init__(self, flag, value=value):
+ self.flag = flag
+ self.val = value
+ self.found = 0
+ def callback(self, values):
+ try:
+ if(len(values) == 0):
+ if not self.found:
+ print "find NOT FOUND"
+ else:
+ print "find FOUND"
+ else:
+ if self.val in values:
+ self.found = 1
+ finally:
+ self.flag.set()
+
+ b.valueForKey(key, cb(fa).callback)
+ fa.wait()
+ c.valueForKey(key, cb(fb).callback)
+ fb.wait()
+ d.valueForKey(key, cb(fc).callback)
+ fc.wait()
def test_one(host, port, db='/tmp/test'):
- import thread
- k = Khashmir(host, port, db)
- thread.start_new_thread(k.app.run, ())
- return k
+ import thread
+ k = Khashmir(host, port, db)
+ thread.start_new_thread(reactor.run, ())
+ return k
if __name__ == "__main__":
import sys
time.sleep(3)
print "finding nodes..."
for i in range(10):
- test_find_nodes(l)
+ test_find_nodes(l)
print "inserting and fetching values..."
for i in range(10):
- test_find_value(l)
+ test_find_value(l)
from node import Node
from twisted.internet.defer import Deferred
-from xmlrpcclient import XMLRPCClientFactory as factory
from const import reactor, NULL_ID
class KNode(Node):
- def makeResponse(self, df):
- """ Make our callback cover that checks to make sure the id of the response is the same as what we are expecting """
- def _callback(args, d=df):
- try:
- l, sender = args
- except:
- d.callback(args)
- else:
- if self.id != NULL_ID and sender['id'] != self._senderDict['id']:
- d.errback()
- else:
- d.callback(args)
- return _callback
- def ping(self, sender):
- df = Deferred()
- f = factory('ping', (sender,), self.makeResponse(df), df.errback)
- reactor.connectTCP(self.host, self.port, f)
- return df
- def findNode(self, target, sender):
- df = Deferred()
- f = factory('find_node', (target.encode('base64'), sender), self.makeResponse(df), df.errback)
- reactor.connectTCP(self.host, self.port, f)
- return df
- def storeValue(self, key, value, sender):
- df = Deferred()
- f = factory('store_value', (key.encode('base64'), value.encode('base64'), sender), self.makeResponse(df), df.errback)
- reactor.connectTCP(self.host, self.port, f)
- return df
- def findValue(self, key, sender):
- df = Deferred()
- f = factory('find_value', (key.encode('base64'), sender), self.makeResponse(df), df.errback)
- reactor.connectTCP(self.host, self.port, f)
- return df
+ def makeResponse(self, df):
+ """ Make our callback cover that checks to make sure the id of the response is the same as what we are expecting """
+ def _callback(dict, d=df):
+ try:
+ senderid = dict['sender']['id']
+ except KeyError:
+ d.errback()
+ else:
+ if self.id != NULL_ID and senderid != self._senderDict['id']:
+ d.errback()
+ else:
+ d.callback(dict)
+ return _callback
+
+ def ping(self, sender):
+ return self.conn.protocol.sendRequest('ping', {"sender":sender})
+ def findNode(self, target, sender):
+ return self.conn.protocol.sendRequest('find_node', {"target" : target, "sender": sender})
+ def storeValue(self, key, value, sender):
+ return self.conn.protocol.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender})
+ def findValue(self, key, sender):
+ return self.conn.protocol.sendRequest('find_value', {"key" : key, "sender" : sender})
\ No newline at end of file
--- /dev/null
+import airhook
+from twisted.internet.defer import Deferred
+from twisted.protocols import basic
+from bencode import bencode, bdecode
+from twisted.internet import reactor
+
+import hash
+
+KRPC_TIMEOUT = 30
+
+KRPC_ERROR = 1
+KRPC_ERROR_METHOD_UNKNOWN = 2
+KRPC_ERROR_RECEIVED_UNKNOWN = 3
+KRPC_ERROR_TIMEOUT = 4
+
+class KRPC(basic.NetstringReceiver):
+ noisy = 1
+ def __init__(self):
+ self.tids = {}
+
+ def stringReceived(self, str):
+ # bdecode
+ try:
+ msg = bdecode(str)
+ except Exception, e:
+ print "response decode error: " + `e`
+ self.d.errback()
+ else:
+ # look at msg type
+ if msg['typ'] == 'req':
+ ilen = len(str)
+ # if request
+ # tell factory to handle
+ f = getattr(self.factory ,"krpc_" + msg['req'], None)
+ if f and callable(f):
+ msg['arg']['_krpc_sender'] = self.transport.addr
+ try:
+ ret = apply(f, (), msg['arg'])
+ except Exception, e:
+ ## send error
+ str = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`})
+ olen = len(str)
+ self.sendString(str)
+ else:
+ if ret:
+ # make response
+ str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
+ else:
+ str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : []})
+ # send response
+ olen = len(str)
+ self.sendString(str)
+
+ else:
+ # unknown method
+ str = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
+ olen = len(str)
+ self.sendString(str)
+ if self.noisy:
+ print "%s >>> (%s, %s) - %s %s %s" % (self.transport.addr, self.factory.node.host, self.factory.node.port,
+ ilen, msg['req'], olen)
+ elif msg['typ'] == 'rsp':
+ # if response
+ # lookup tid
+ if self.tids.has_key(msg['tid']):
+ df = self.tids[msg['tid']]
+ # callback
+ df.callback(msg['rsp'])
+ del(self.tids[msg['tid']])
+ # no tid, perhaps this transaction timed out already...
+ elif msg['typ'] == 'err':
+ # if error
+ # lookup tid
+ df = self.tids[msg['tid']]
+ # callback
+ df.errback(msg['err'])
+ del(self.tids[msg['tid']])
+ else:
+ # unknown message type
+ df = self.tids[msg['tid']]
+ # callback
+ df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
+ del(self.tids[msg['tid']])
+
+ def sendRequest(self, method, args):
+ # make message
+ # send it
+ msg = {'tid' : hash.newID(), 'typ' : 'req', 'req' : method, 'arg' : args}
+ str = bencode(msg)
+ self.sendString(str)
+ d = Deferred()
+ self.tids[msg['tid']] = d
+
+ def timeOut(tids = self.tids, id = msg['tid']):
+ if tids.has_key(id):
+ df = tids[id]
+ del(tids[id])
+ df.errback(KRPC_ERROR_TIMEOUT)
+ reactor.callLater(KRPC_TIMEOUT, timeOut)
+ return d
+
\ No newline at end of file
import hash
import const
-from const import K, HASH_LENGTH
+from const import K, HASH_LENGTH, NULL_ID
from node import Node
class KTable:
- """local routing table for a kademlia like distributed hash table"""
- def __init__(self, node):
- # this is the root node, a.k.a. US!
- self.node = node
- self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)]
- self.insertNode(node)
-
- def _bucketIndexForInt(self, num):
- """the index of the bucket that should hold int"""
- return bisect_left(self.buckets, num)
-
- def findNodes(self, id):
- """
- return K nodes in our own local table closest to the ID.
- """
-
- if isinstance(id, str):
- num = hash.intify(id)
- elif isinstance(id, Node):
- num = id.num
- elif isinstance(id, int) or isinstance(id, long):
- num = id
- else:
- raise TypeError, "findNodes requires an int, string, or Node"
-
- nodes = []
- i = self._bucketIndexForInt(num)
-
- # if this node is already in our table then return it
- try:
- index = self.buckets[i].l.index(num)
- except ValueError:
- pass
- else:
- return [self.buckets[i].l[index]]
-
- # don't have the node, get the K closest nodes
- nodes = nodes + self.buckets[i].l
- if len(nodes) < K:
- # need more nodes
- min = i - 1
- max = i + 1
- while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
- #ASw: note that this requires K be even
- if min >= 0:
- nodes = nodes + self.buckets[min].l
- if max < len(self.buckets):
- nodes = nodes + self.buckets[max].l
- min = min - 1
- max = max + 1
-
- nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
- return nodes[:K]
-
- def _splitBucket(self, a):
- diff = (a.max - a.min) / 2
- b = KBucket([], a.max - diff, a.max)
- self.buckets.insert(self.buckets.index(a.min) + 1, b)
- a.max = a.max - diff
- # transfer nodes to new bucket
- for anode in a.l[:]:
- if anode.num >= a.max:
- a.l.remove(anode)
- b.l.append(anode)
-
- def replaceStaleNode(self, stale, new):
- """this is used by clients to replace a node returned by insertNode after
- it fails to respond to a Pong message"""
- i = self._bucketIndexForInt(stale.num)
- try:
- it = self.buckets[i].l.index(stale.num)
- except ValueError:
- return
-
- del(self.buckets[i].l[it])
- if new:
- self.buckets[i].l.append(new)
-
- def insertNode(self, node, contacted=1):
- """
- this insert the node, returning None if successful, returns the oldest node in the bucket if it's full
- the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!!
- contacted means that yes, we contacted THEM and we know the node is reachable
- """
- assert node.id != " "*20
- if node.id == self.node.id: return
- # get the bucket for this node
- i = self. _bucketIndexForInt(node.num)
- # check to see if node is in the bucket already
- try:
- it = self.buckets[i].l.index(node.num)
- except ValueError:
- # no
- pass
- else:
- if contacted:
- node.updateLastSeen()
- # move node to end of bucket
- xnode = self.buckets[i].l[it]
- del(self.buckets[i].l[it])
- # note that we removed the original and replaced it with the new one
- # utilizing this nodes new contact info
- self.buckets[i].l.append(xnode)
- self.buckets[i].touch()
- return
-
- # we don't have this node, check to see if the bucket is full
- if len(self.buckets[i].l) < K:
- # no, append this node and return
- if contacted:
- node.updateLastSeen()
- self.buckets[i].l.append(node)
- self.buckets[i].touch()
- return
-
- # bucket is full, check to see if self.node is in the bucket
- if not (self.buckets[i].min <= self.node < self.buckets[i].max):
- return self.buckets[i].l[0]
-
- # this bucket is full and contains our node, split the bucket
- if len(self.buckets) >= HASH_LENGTH:
- # our table is FULL, this is really unlikely
- print "Hash Table is FULL! Increase K!"
- return
-
- self._splitBucket(self.buckets[i])
-
- # now that the bucket is split and balanced, try to insert the node again
- return self.insertNode(node)
-
- def justSeenNode(self, node):
- """call this any time you get a message from a node
- it will update it in the table if it's there """
- try:
- n = self.findNodes(node.num)[0]
- except IndexError:
- return None
- else:
- tstamp = n.lastSeen
- n.updateLastSeen()
- return tstamp
-
- def invalidateNode(self, n):
- """
- forget about node n - use when you know that node is invalid
- """
- self.replaceStaleNode(n, None)
-
- def nodeFailed(self, node):
- """ call this when a node fails to respond to a message, to invalidate that node """
- try:
- n = self.findNodes(node.num)[0]
- except IndexError:
- return None
- else:
- if n.msgFailed() >= const.MAX_FAILURES:
- self.invalidateNode(n)
-
+ """local routing table for a kademlia like distributed hash table"""
+ def __init__(self, node):
+ # this is the root node, a.k.a. US!
+ self.node = node
+ self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)]
+ self.insertNode(node)
+
+ def _bucketIndexForInt(self, num):
+ """the index of the bucket that should hold int"""
+ return bisect_left(self.buckets, num)
+
+ def findNodes(self, id):
+ """
+ return K nodes in our own local table closest to the ID.
+ """
+
+ if isinstance(id, str):
+ num = hash.intify(id)
+ elif isinstance(id, Node):
+ num = id.num
+ elif isinstance(id, int) or isinstance(id, long):
+ num = id
+ else:
+ raise TypeError, "findNodes requires an int, string, or Node"
+
+ nodes = []
+ i = self._bucketIndexForInt(num)
+
+ # if this node is already in our table then return it
+ try:
+ index = self.buckets[i].l.index(num)
+ except ValueError:
+ pass
+ else:
+ return [self.buckets[i].l[index]]
+
+ # don't have the node, get the K closest nodes
+ nodes = nodes + self.buckets[i].l
+ if len(nodes) < K:
+ # need more nodes
+ min = i - 1
+ max = i + 1
+ while len(nodes) < K and (min >= 0 or max < len(self.buckets)):
+ #ASw: note that this requires K be even
+ if min >= 0:
+ nodes = nodes + self.buckets[min].l
+ if max < len(self.buckets):
+ nodes = nodes + self.buckets[max].l
+ min = min - 1
+ max = max + 1
+
+ nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num))
+ return nodes[:K]
+
+ def _splitBucket(self, a):
+ diff = (a.max - a.min) / 2
+ b = KBucket([], a.max - diff, a.max)
+ self.buckets.insert(self.buckets.index(a.min) + 1, b)
+ a.max = a.max - diff
+ # transfer nodes to new bucket
+ for anode in a.l[:]:
+ if anode.num >= a.max:
+ a.l.remove(anode)
+ b.l.append(anode)
+
+ def replaceStaleNode(self, stale, new):
+ """this is used by clients to replace a node returned by insertNode after
+ it fails to respond to a Pong message"""
+ i = self._bucketIndexForInt(stale.num)
+ try:
+ it = self.buckets[i].l.index(stale.num)
+ except ValueError:
+ return
+
+ del(self.buckets[i].l[it])
+ if new:
+ self.buckets[i].l.append(new)
+
+ def insertNode(self, node, contacted=1):
+ """
+ this insert the node, returning None if successful, returns the oldest node in the bucket if it's full
+ the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!!
+ contacted means that yes, we contacted THEM and we know the node is reachable
+ """
+ assert node.id != NULL_ID
+ if node.id == self.node.id: return
+ # get the bucket for this node
+ i = self. _bucketIndexForInt(node.num)
+ # check to see if node is in the bucket already
+ try:
+ it = self.buckets[i].l.index(node.num)
+ except ValueError:
+ # no
+ pass
+ else:
+ if contacted:
+ node.updateLastSeen()
+ # move node to end of bucket
+ xnode = self.buckets[i].l[it]
+ del(self.buckets[i].l[it])
+ # note that we removed the original and replaced it with the new one
+ # utilizing this nodes new contact info
+ self.buckets[i].l.append(xnode)
+ self.buckets[i].touch()
+ return
+
+ # we don't have this node, check to see if the bucket is full
+ if len(self.buckets[i].l) < K:
+ # no, append this node and return
+ if contacted:
+ node.updateLastSeen()
+ self.buckets[i].l.append(node)
+ self.buckets[i].touch()
+ return
+
+ # bucket is full, check to see if self.node is in the bucket
+ if not (self.buckets[i].min <= self.node < self.buckets[i].max):
+ return self.buckets[i].l[0]
+
+ # this bucket is full and contains our node, split the bucket
+ if len(self.buckets) >= HASH_LENGTH:
+ # our table is FULL, this is really unlikely
+ print "Hash Table is FULL! Increase K!"
+ return
+
+ self._splitBucket(self.buckets[i])
+
+ # now that the bucket is split and balanced, try to insert the node again
+ return self.insertNode(node)
+
+ def justSeenNode(self, id):
+ """call this any time you get a message from a node
+ it will update it in the table if it's there """
+ try:
+ n = self.findNodes(id)[0]
+ except IndexError:
+ return None
+ else:
+ tstamp = n.lastSeen
+ n.updateLastSeen()
+ return tstamp
+
+ def invalidateNode(self, n):
+ """
+ forget about node n - use when you know that node is invalid
+ """
+ self.replaceStaleNode(n, None)
+
+ def nodeFailed(self, node):
+ """ call this when a node fails to respond to a message, to invalidate that node """
+ try:
+ n = self.findNodes(node.num)[0]
+ except IndexError:
+ return None
+ else:
+ if n.msgFailed() >= const.MAX_FAILURES:
+ self.invalidateNode(n)
+
class KBucket:
- __slots__ = ('min', 'max', 'lastAccessed')
- def __init__(self, contents, min, max):
- self.l = contents
- self.min = min
- self.max = max
- self.lastAccessed = time.time()
-
- def touch(self):
- self.lastAccessed = time.time()
-
- def getNodeWithInt(self, num):
- if num in self.l: return num
- else: raise ValueError
-
- def __repr__(self):
- return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max)
-
- ## Comparators
- # necessary for bisecting list of buckets with a hash expressed as an integer or a distance
- # compares integer or node object with the bucket's range
- def __lt__(self, a):
- if isinstance(a, Node): a = a.num
- return self.max <= a
- def __le__(self, a):
- if isinstance(a, Node): a = a.num
- return self.min < a
- def __gt__(self, a):
- if isinstance(a, Node): a = a.num
- return self.min > a
- def __ge__(self, a):
- if isinstance(a, Node): a = a.num
- return self.max >= a
- def __eq__(self, a):
- if isinstance(a, Node): a = a.num
- return self.min <= a and self.max > a
- def __ne__(self, a):
- if isinstance(a, Node): a = a.num
- return self.min >= a or self.max < a
+ __slots__ = ('min', 'max', 'lastAccessed')
+ def __init__(self, contents, min, max):
+ self.l = contents
+ self.min = min
+ self.max = max
+ self.lastAccessed = time.time()
+
+ def touch(self):
+ self.lastAccessed = time.time()
+
+ def getNodeWithInt(self, num):
+ if num in self.l: return num
+ else: raise ValueError
+
+ def __repr__(self):
+ return "<KBucket %d items (%d to %d)>" % (len(self.l), self.min, self.max)
+
+ ## Comparators
+ # necessary for bisecting list of buckets with a hash expressed as an integer or a distance
+ # compares integer or node object with the bucket's range
+ def __lt__(self, a):
+ if isinstance(a, Node): a = a.num
+ return self.max <= a
+ def __le__(self, a):
+ if isinstance(a, Node): a = a.num
+ return self.min < a
+ def __gt__(self, a):
+ if isinstance(a, Node): a = a.num
+ return self.min > a
+ def __ge__(self, a):
+ if isinstance(a, Node): a = a.num
+ return self.max >= a
+ def __eq__(self, a):
+ if isinstance(a, Node): a = a.num
+ return self.min <= a and self.max > a
+ def __ne__(self, a):
+ if isinstance(a, Node): a = a.num
+ return self.min >= a or self.max < a
### UNIT TESTS ###
def setUp(self):
self.a = Node().init(hash.newID(), 'localhost', 2002)
self.t = KTable(self.a)
- print self.t.buckets[0].l
- def test_replace_stale_node(self):
+ def testAddNode(self):
self.b = Node().init(hash.newID(), 'localhost', 2003)
- self.t.replaceStaleNode(self.a, self.b)
- assert len(self.t.buckets[0].l) == 1
- assert self.t.buckets[0].l[0].id == self.b.id
+ self.t.insertNode(self.b)
+ self.assertEqual(len(self.t.buckets[0].l), 1)
+ self.assertEqual(self.t.buckets[0].l[0], self.b)
+
+ def testRemove(self):
+ self.testAddNode()
+ self.t.invalidateNode(self.b)
+ self.assertEqual(len(self.t.buckets[0].l), 0)
+
+ def testFail(self):
+ self.testAddNode()
+ for i in range(const.MAX_FAILURES - 1):
+ self.t.nodeFailed(self.b)
+ self.assertEqual(len(self.t.buckets[0].l), 1)
+ self.assertEqual(self.t.buckets[0].l[0], self.b)
+
+ self.t.nodeFailed(self.b)
+ self.assertEqual(len(self.t.buckets[0].l), 0)
+
if __name__ == "__main__":
unittest.main()
import hash
import time
from types import *
-from xmlrpclib import Binary
class Node:
- """encapsulate contact info"""
- def __init__(self):
- self.fails = 0
- self.lastSeen = 0
- self.id = self.host = self.port = ''
-
- def init(self, id, host, port):
- self.id = id
- self.num = hash.intify(id)
- self.host = host
- self.port = port
- self._senderDict = {'id': self.id.encode('base64'), 'port' : self.port, 'host' : self.host}
- return self
-
- def initWithDict(self, dict):
- self._senderDict = dict
- self.id = dict['id'].decode('base64')
- self.num = hash.intify(self.id)
- self.port = dict['port']
- self.host = dict['host']
- return self
-
- def updateLastSeen(self):
- self.lastSeen = time.time()
- self.fails = 0
-
- def msgFailed(self):
- self.fails = self.fails + 1
- return self.fails
-
- def senderDict(self):
- return self._senderDict
-
- def __repr__(self):
- return `(self.id, self.host, self.port)`
-
+ """encapsulate contact info"""
+ def __init__(self):
+ self.fails = 0
+ self.lastSeen = 0
+ self.id = self.host = self.port = ''
+
+ def init(self, id, host, port):
+ self.id = id
+ self.num = hash.intify(id)
+ self.host = host
+ self.port = port
+ self._senderDict = {'id': self.id, 'port' : self.port, 'host' : self.host}
+ return self
+
+ def initWithDict(self, dict):
+ self._senderDict = dict
+ self.id = dict['id']
+ self.num = hash.intify(self.id)
+ self.port = dict['port']
+ self.host = dict['host']
+ return self
+
+ def updateLastSeen(self):
+ self.lastSeen = time.time()
+ self.fails = 0
+
+ def msgFailed(self):
+ self.fails = self.fails + 1
+ return self.fails
+
+ def senderDict(self):
+ return self._senderDict
+
+ def __repr__(self):
+ return `(self.id, self.host, self.port)`
+
## these comparators let us bisect/index a list full of nodes with either a node or an int/long
- def __lt__(self, a):
- if type(a) == InstanceType:
- a = a.num
- return self.num < a
- def __le__(self, a):
- if type(a) == InstanceType:
- a = a.num
- return self.num <= a
- def __gt__(self, a):
- if type(a) == InstanceType:
- a = a.num
- return self.num > a
- def __ge__(self, a):
- if type(a) == InstanceType:
- a = a.num
- return self.num >= a
- def __eq__(self, a):
- if type(a) == InstanceType:
- a = a.num
- return self.num == a
- def __ne__(self, a):
- if type(a) == InstanceType:
- a = a.num
- return self.num != a
+ def __lt__(self, a):
+ if type(a) == InstanceType:
+ a = a.num
+ return self.num < a
+ def __le__(self, a):
+ if type(a) == InstanceType:
+ a = a.num
+ return self.num <= a
+ def __gt__(self, a):
+ if type(a) == InstanceType:
+ a = a.num
+ return self.num > a
+ def __ge__(self, a):
+ if type(a) == InstanceType:
+ a = a.num
+ return self.num >= a
+ def __eq__(self, a):
+ if type(a) == InstanceType:
+ a = a.num
+ return self.num == a
+ def __ne__(self, a):
+ if type(a) == InstanceType:
+ a = a.num
+ return self.num != a
import unittest
class TestNode(unittest.TestCase):
- def setUp(self):
- self.node = Node().init(hash.newID(), 'localhost', 2002)
- def testUpdateLastSeen(self):
- t = self.node.lastSeen
- self.node.updateLastSeen()
- assert t < self.node.lastSeen
-
\ No newline at end of file
+ def setUp(self):
+ self.node = Node().init(hash.newID(), 'localhost', 2002)
+ def testUpdateLastSeen(self):
+ t = self.node.lastSeen
+ self.node.updateLastSeen()
+ assert t < self.node.lastSeen
+
\ No newline at end of file
import ktable, khashmir
import hash, node, knode
-import actions, xmlrpcclient
+import actions
import btemplate
+import test_airhook
-tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'knode', 'btemplate', 'actions', 'ktable', 'xmlrpcclient'])
+tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'knode', 'actions', 'ktable', 'test_airhook'])
result = unittest.TextTestRunner().run(tests)
self.buf = ""
def dataReceived(self, data):
self.buf += data
-
-class EchoFactory(protocol.Factory):
- def buildProtocol(self, addr):
- return Echo()
-class NoisyFactory(protocol.Factory):
- def buildProtocol(self, addr):
- return Noisy()
-class ReceiverFactory(protocol.Factory):
- def buildProtocol(self, addr):
- return Receiver()
-class StreamReceiverFactory(protocol.Factory):
- def buildProtocol(self, addr):
- return StreamReceiver()
-
+
def makeEcho(port):
- return listenAirhookStream(port, EchoFactory())
+ f = protocol.Factory(); f.protocol = Echo
+ return listenAirhookStream(port, f)
def makeNoisy(port):
- return listenAirhookStream(port, NoisyFactory())
+ f = protocol.Factory(); f.protocol = Noisy
+ return listenAirhookStream(port, f)
def makeReceiver(port):
- return listenAirhookStream(port, ReceiverFactory())
+ f = protocol.Factory(); f.protocol = Receiver
+ return listenAirhookStream(port, f)
def makeStreamReceiver(port):
- return listenAirhookStream(port, StreamReceiverFactory())
+ f = protocol.Factory(); f.protocol = StreamReceiver
+ return listenAirhookStream(port, f)
class DummyTransport:
def __init__(self):
self.noisy = 0
self.a = makeStreamReceiver(2028)
self.b = makeEcho(2029)
- self.ac = self.a.connectionForAddr(('127.0.0.1', 2028))
- self.bc = self.b.connectionForAddr(('127.0.0.1', 2029))
+ self.ac = self.a.connectionForAddr(('127.0.0.1', 2029))
def testBig(self):
- msg = open('/dev/urandom').read(4096)
+ msg = open('/dev/urandom').read(256)
self.ac.write(msg)
reactor.iterate()
reactor.iterate()
reactor.iterate()
reactor.iterate()
reactor.iterate()
- reactor.iterate()
- reactor.iterate()
self.assertEqual(self.ac.protocol.buf, msg)
\ No newline at end of file
--- /dev/null
+from unittest import *
+from krpc import *
+from airhook import *
+
+import sys
+
+if __name__ =="__main__":
+ tests = unittest.defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]])
+ result = unittest.TextTestRunner().run(tests)
+
+
+def connectionForAddr(host, port):
+ return host
+
+class Receiver(protocol.Factory):
+ protocol = KRPC
+ def __init__(self):
+ self.buf = []
+ def krpc_store(self, msg, _krpc_sender):
+ self.buf += [msg]
+ def krpc_echo(self, msg, _krpc_sender):
+ return msg
+
+class SimpleTest(TestCase):
+ def setUp(self):
+ self.noisy = 0
+
+ self.af = Receiver()
+ self.bf = Receiver()
+ self.a = listenAirhookStream(4040, self.af)
+ self.b = listenAirhookStream(4041, self.bf)
+
+ def testSimpleMessage(self):
+ self.noisy = 1
+ self.a.connectionForAddr(('127.0.0.1', 4041)).protocol.sendRequest('store', {'msg' : "This is a test."})
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.bf.buf, ["This is a test."])
+
+class SimpleTest(TestCase):
+ def setUp(self):
+ self.noisy = 0
+
+ self.af = Receiver()
+ self.bf = Receiver()
+ self.a = listenAirhookStream(4050, self.af)
+ self.b = listenAirhookStream(4051, self.bf)
+
+ def testSimpleMessage(self):
+ self.noisy = 1
+ self.a.connectionForAddr(('127.0.0.1', 4051)).protocol.sendRequest('store', {'msg' : "This is a test."})
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.bf.buf, ["This is a test."])
+
+class EchoTest(TestCase):
+ def setUp(self):
+ self.noisy = 0
+ self.msg = None
+
+ self.af = Receiver()
+ self.bf = Receiver()
+ self.a = listenAirhookStream(4042, self.af)
+ self.b = listenAirhookStream(4043, self.bf)
+
+ def testEcho(self):
+ self.noisy = 1
+ df = self.a.connectionForAddr(('127.0.0.1', 4043)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.msg, "This is a test.")
+
+ def gotMsg(self, msg):
+ self.msg = msg
+
+class MultiEchoTest(TestCase):
+ def setUp(self):
+ self.noisy = 0
+ self.msg = None
+
+ self.af = Receiver()
+ self.bf = Receiver()
+ self.a = listenAirhookStream(4048, self.af)
+ self.b = listenAirhookStream(4049, self.bf)
+
+ def testMultiEcho(self):
+ self.noisy = 1
+ df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is a test."})
+ df.addCallback(self.gotMsg)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.msg, "This is a test.")
+
+ df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is another test."})
+ df.addCallback(self.gotMsg)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.msg, "This is another test.")
+
+ df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is yet another test."})
+ df.addCallback(self.gotMsg)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.msg, "This is yet another test.")
+
+ def gotMsg(self, msg):
+ self.msg = msg
+
+class UnknownMethErrTest(TestCase):
+ def setUp(self):
+ self.noisy = 0
+ self.err = None
+ self.af = Receiver()
+ self.bf = Receiver()
+ self.a = listenAirhookStream(4044, self.af)
+ self.b = listenAirhookStream(4045, self.bf)
+
+ def testUnknownMeth(self):
+ self.noisy = 1
+ df = self.a.connectionForAddr(('127.0.0.1', 4045)).protocol.sendRequest('blahblah', {'msg' : "This is a test."})
+ df.addErrback(self.gotErr)
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ reactor.iterate()
+ self.assertEqual(self.err, KRPC_ERROR_METHOD_UNKNOWN)
+
+ def gotErr(self, err):
+ self.err = err.value
def bucket_stats(l):
- """given a list of khashmir instances, finds min, max, and average number of nodes in tables"""
- max = avg = 0
- min = None
- def count(buckets):
- c = 0
- for bucket in buckets:
- c = c + len(bucket.l)
- return c
- for node in l:
- c = count(node.table.buckets)
- if min == None:
- min = c
- elif c < min:
- min = c
- if c > max:
- max = c
- avg = avg + c
- avg = avg / len(l)
- return {'min':min, 'max':max, 'avg':avg}
+ """given a list of khashmir instances, finds min, max, and average number of nodes in tables"""
+ max = avg = 0
+ min = None
+ def count(buckets):
+ c = 0
+ for bucket in buckets:
+ c = c + len(bucket.l)
+ return c
+ for node in l:
+ c = count(node.table.buckets)
+ if min == None:
+ min = c
+ elif c < min:
+ min = c
+ if c > max:
+ max = c
+ avg = avg + c
+ avg = avg / len(l)
+ return {'min':min, 'max':max, 'avg':avg}
+++ /dev/null
-from twisted.internet.protocol import ClientFactory
-from twisted.protocols.http import HTTPClient
-from twisted.internet.defer import Deferred
-
-from xmlrpclib import loads, dumps
-import socket
-
-USER_AGENT = 'Python/Twisted XMLRPC 0.1'
-class XMLRPCClient(HTTPClient):
- def connectionMade(self):
- payload = dumps(self.args, self.method)
- self.sendCommand('POST', '/RPC2')
- self.sendHeader('User-Agent', USER_AGENT)
- self.sendHeader('Content-Type', 'text/xml')
- self.sendHeader('Content-Length', len(payload))
- self.endHeaders()
- self.transport.write(payload)
- self.transport.write('\r\n')
-
- def handleResponse(self, buf):
- try:
- args, name = loads(buf)
- except Exception, e:
- print "response decode error: " + `e`
- self.d.errback()
- else:
- apply(self.d.callback, args)
-
-class XMLRPCClientFactory(ClientFactory):
- def __init__(self, method, args, callback=None, errback=None):
- self.method = method
- self.args = args
- self.d = Deferred()
- if callback:
- self.d.addCallback(callback)
- if errback:
- self.d.addErrback(errback)
- self.noisy = 0
-
- def buildProtocol(self, addr):
- prot = XMLRPCClient()
- prot.method = self.method
- prot.args = self.args
- prot.d = self.d
- return prot
-
- def clientConnectionFailed(self, connector, reason):
- self.d.errback()
\ No newline at end of file