From: burris Date: Thu, 16 Jan 2003 16:28:42 +0000 (+0000) Subject: ripped out xmlrpc, experimented with xmlrpc but with bencode, finally X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=ae12e13dbef1df6b3bdf04d4cca00a0a2a688af7;p=quix0rs-apt-p2p.git ripped out xmlrpc, experimented with xmlrpc but with bencode, finally settled on bencode rpc over Airhook --- diff --git a/actions.py b/actions.py index 5fc11b3..f367c53 100644 --- a/actions.py +++ b/actions.py @@ -8,183 +8,237 @@ from knode import KNode as Node from ktable import KTable, K class ActionBase: - """ base class for some long running asynchronous proccesses like finding nodes or values """ - def __init__(self, table, target, callback): - self.table = table - self.target = target - self.num = intify(target) - self.found = {} - self.queried = {} - self.answered = {} - self.callback = callback - self.outstanding = 0 - self.finished = 0 - - def sort(a, b, num=self.num): - """ this function is for sorting nodes relative to the ID we are looking for """ - x, y = num ^ a.num, num ^ b.num - if x > y: - return 1 - elif x < y: - return -1 - return 0 - self.sort = sort - - def goWithNodes(self, t): - pass - - + """ base class for some long running asynchronous proccesses like finding nodes or values """ + def __init__(self, table, target, callback): + self.table = table + self.target = target + self.num = intify(target) + self.found = {} + self.queried = {} + self.answered = {} + self.callback = callback + self.outstanding = 0 + self.finished = 0 + + def sort(a, b, num=self.num): + """ this function is for sorting nodes relative to the ID we are looking for """ + x, y = num ^ a.num, num ^ b.num + if x > y: + return 1 + elif x < y: + return -1 + return 0 + self.sort = sort + + def goWithNodes(self, t): + pass + + FIND_NODE_TIMEOUT = 15 class FindNode(ActionBase): - """ find node action merits it's own class as it is a long running stateful process """ - def handleGotNodes(self, args): - l, sender = args - sender = Node().initWithDict(sender) - self.table.table.insertNode(sender) - if self.finished or self.answered.has_key(sender.id): - # a day late and a dollar short - return - self.outstanding = self.outstanding - 1 - self.answered[sender.id] = 1 - for node in l: - n = Node().initWithDict(node) - if not self.found.has_key(n.id): - self.found[n.id] = n - self.schedule() - - def schedule(self): - """ - send messages to new peers, if necessary - """ - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - - for node in l[:K]: - if node.id == self.target: - self.finished=1 - return self.callback([node]) - if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: - #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding >= const.CONCURRENT_REQS: - break - assert(self.outstanding) >=0 - if self.outstanding == 0: - ## all done!! - self.finished=1 - reactor.callFromThread(self.callback, l[:K]) - - def makeMsgFailed(self, node): - def defaultGotNodes(err, self=self, node=node): - self.table.table.nodeFailed(node) - self.outstanding = self.outstanding - 1 - self.schedule() - return defaultGotNodes - - def goWithNodes(self, nodes): - """ - this starts the process, our argument is a transaction with t.extras being our list of nodes - it's a transaction since we got called from the dispatcher - """ - for node in nodes: - if node.id == self.table.node.id: - continue - else: - self.found[node.id] = node - - self.schedule() - + """ find node action merits it's own class as it is a long running stateful process """ + def handleGotNodes(self, dict): + l = dict["nodes"] + sender = dict["sender"] + sender = Node().initWithDict(sender) + sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port)) + self.table.table.insertNode(sender) + if self.finished or self.answered.has_key(sender.id): + # a day late and a dollar short + return + self.outstanding = self.outstanding - 1 + self.answered[sender.id] = 1 + for node in l: + n = Node().initWithDict(node) + n.conn = self.table.airhook.connectionForAddr((n.host, n.port)) + if not self.found.has_key(n.id): + self.found[n.id] = n + self.schedule() + + def schedule(self): + """ + send messages to new peers, if necessary + """ + if self.finished: + return + l = self.found.values() + l.sort(self.sort) + for node in l[:K]: + if node.id == self.target: + self.finished=1 + return self.callback([node]) + if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: + #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT + df = node.findNode(self.target, self.table.node.senderDict()) + df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 + if self.outstanding >= const.CONCURRENT_REQS: + break + assert(self.outstanding) >=0 + if self.outstanding == 0: + ## all done!! + self.finished=1 + reactor.callFromThread(self.callback, l[:K]) + + def makeMsgFailed(self, node): + def defaultGotNodes(err, self=self, node=node): + self.table.table.nodeFailed(node) + self.outstanding = self.outstanding - 1 + self.schedule() + return defaultGotNodes + + def goWithNodes(self, nodes): + """ + this starts the process, our argument is a transaction with t.extras being our list of nodes + it's a transaction since we got called from the dispatcher + """ + for node in nodes: + if node.id == self.table.node.id: + continue + else: + self.found[node.id] = node + + self.schedule() + GET_VALUE_TIMEOUT = 15 class GetValue(FindNode): """ get value task """ - def handleGotNodes(self, args): - l, sender = args - sender = Node().initWithDict(sender) - self.table.table.insertNode(sender) - if self.finished or self.answered.has_key(sender.id): - # a day late and a dollar short - return - self.outstanding = self.outstanding - 1 - self.answered[sender.id] = 1 - # go through nodes - # if we have any closer than what we already got, query them - if l.has_key('nodes'): - for node in l['nodes']: - n = Node().initWithDict(node) - if not self.found.has_key(n.id): - self.found[n.id] = n - elif l.has_key('values'): - def x(y, z=self.results): - y = y.decode('base64') - if not z.has_key(y): - z[y] = 1 - return y - else: - return None - v = filter(None, map(x, l['values'])) - if(len(v)): - reactor.callFromThread(self.callback, v) - self.schedule() - + def handleGotNodes(self, dict): + sender = dict["sender"] + sender = Node().initWithDict(sender) + sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port)) + self.table.table.insertNode(sender) + if self.finished or self.answered.has_key(sender.id): + # a day late and a dollar short + return + self.outstanding = self.outstanding - 1 + self.answered[sender.id] = 1 + # go through nodes + # if we have any closer than what we already got, query them + if dict.has_key('nodes'): + for node in dict['nodes']: + n = Node().initWithDict(node) + n.conn = self.table.airhook.connectionForAddr((n.host, n.port)) + if not self.found.has_key(n.id): + self.found[n.id] = n + elif dict.has_key('values'): + def x(y, z=self.results): + if not z.has_key(y): + z[y] = 1 + return y + else: + return None + v = filter(None, map(x, dict['values'])) + if(len(v)): + reactor.callFromThread(self.callback, v) + self.schedule() + ## get value def schedule(self): - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - - for node in l[:K]: - if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: - #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT - df = node.findValue(self.target, self.table.node.senderDict()) - df.addCallback(self.handleGotNodes) - df.addErrback(self.makeMsgFailed(node)) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding >= const.CONCURRENT_REQS: - break - assert(self.outstanding) >=0 - if self.outstanding == 0: - ## all done, didn't find it!! - self.finished=1 - reactor.callFromThread(self.callback,[]) + if self.finished: + return + l = self.found.values() + l.sort(self.sort) + + for node in l[:K]: + if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: + #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT + df = node.findValue(self.target, self.table.node.senderDict()) + df.addCallback(self.handleGotNodes) + df.addErrback(self.makeMsgFailed(node)) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 + if self.outstanding >= const.CONCURRENT_REQS: + break + assert(self.outstanding) >=0 + if self.outstanding == 0: + ## all done, didn't find it!! + self.finished=1 + reactor.callFromThread(self.callback,[]) ## get value def goWithNodes(self, nodes, found=None): - self.results = {} - if found: - for n in found: - self.results[n] = 1 - for node in nodes: - if node.id == self.table.node.id: - continue - else: - self.found[node.id] = node - - self.schedule() + self.results = {} + if found: + for n in found: + self.results[n] = 1 + for node in nodes: + if node.id == self.table.node.id: + continue + else: + self.found[node.id] = node + + self.schedule() + +class StoreValue(ActionBase): + def __init__(self, table, target, value, callback): + ActionBase.__init__(self, table, target, callback) + self.value = value + self.stored = [] + + def storedValue(self, t, node): + self.outstanding -= 1 + self.table.insertNode(node) + if self.finished: + return + self.stored.append(t) + if len(self.stored) >= const.STORE_REDUNDANCY: + self.finished=1 + self.callback(self.stored) + else: + if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY: + self.schedule() + + def storeFailed(self, t, node): + self.table.nodeFailed(node) + self.outstanding -= 1 + if self.finished: + return + self.schedule() + + def schedule(self): + if self.finished: + return + num = const.CONCURRENT_REQS - self.outstanding + if num > const.STORE_REDUNDANCY: + num = const.STORE_REDUNDANCY + for i in range(num): + try: + node = self.nodes.pop() + except IndexError: + if self.outstanding == 0: + self.finished = 1 + self.callback(self.stored) + else: + if not node.id == self.table.node.id: + self.outstanding += 1 + df = node.storeValue(self.target, self.value, self.table.node.senderDict()) + df.addCallback(self.storedValue, node=node) + df.addErrback(self.storeFailed, node=node) + + def goWithNodes(self, nodes): + self.nodes = nodes + self.nodes.sort(self.sort) + self.schedule() class KeyExpirer: - def __init__(self, store): - self.store = store - reactor.callLater(const.KEINITIAL_DELAY, self.doExpire) - - def doExpire(self): - self.cut = "%0.6f" % (time() - const.KE_AGE) - self._expire() - - def _expire(self): - c = self.store.cursor() - s = "delete from kv where time < '%s';" % self.cut - c.execute(s) - reactor.callLater(const.KE_DELAY, self.doExpire) - \ No newline at end of file + def __init__(self, store): + self.store = store + reactor.callLater(const.KEINITIAL_DELAY, self.doExpire) + + def doExpire(self): + self.cut = "%0.6f" % (time() - const.KE_AGE) + self._expire() + + def _expire(self): + c = self.store.cursor() + s = "delete from kv where time < '%s';" % self.cut + c.execute(s) + reactor.callLater(const.KE_DELAY, self.doExpire) + \ No newline at end of file diff --git a/airhook.py b/airhook.py index 96009d4..3f1f80b 100644 --- a/airhook.py +++ b/airhook.py @@ -36,6 +36,8 @@ class Airhook(protocol.DatagramProtocol): self.connections = {} def datagramReceived(self, datagram, addr): + #print `addr`, `datagram` + #if addr != self.addr: self.connectionForAddr(addr).datagramReceived(datagram) def connectionForAddr(self, addr): @@ -49,7 +51,11 @@ class Airhook(protocol.DatagramProtocol): else: conn = self.connections[addr] return conn - +# def makeConnection(self, transport): +# protocol.DatagramProtocol.makeConnection(self, transport) +# tup = transport.getHost() +# self.addr = (tup[1], tup[2]) + class AirhookPacket: def __init__(self, msg): self.datagram = msg @@ -92,7 +98,7 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne self.observed = None # their session id self.sessionID = long(rand(0, 2**32)) # our session id - self.lastTransmit = -1 # time we last sent a packet with messages + self.lastTransmit = 0 # time we last sent a packet with messages self.lastReceieved = 0 # time we last received a packet with messages self.lastTransmitSeq = -1 # last sequence we sent a packet self.state = pending @@ -103,6 +109,7 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne self.sendSession = None # send session/observed fields until obSeq > sendSession self.response = 0 # if we know we have a response now (like resending missed packets) self.noisy = 0 + self.scheduled = 0 # a sendNext is scheduled, don't schedule another self.resetMessages() def resetMessages(self): @@ -146,14 +153,12 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne self.state = pending self.resetMessages() self.inSeq = p.seq - self.response = 1 elif self.state == confirmed: if p.session != None or p.observed != None : - if p.session != self.observed or p.observed != self.sessionID: + if (p.session != None and p.session != self.observed) or (p.observed != None and p.observed != self.sessionID): self.state = pending - if seq == 0: - self.resetMessages() - self.inSeq = p.seq + self.resetMessages() + self.inSeq = p.seq if self.state != pending: msgs = [] @@ -262,28 +267,31 @@ class AirhookConnection(protocol.ConnectedDatagramProtocol, interfaces.IUDPConne self.lastTransmit = time() self.transport.write(packet, self.addr) + self.scheduled = 0 self.schedule() def timeToSend(self): # any outstanding messages and are we not too far ahead of our counterparty? - if self.omsgq and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256: - return 1 + if len(self.omsgq) > 0 and self.state != sent and (self.next + 1) % 256 != self.outMsgNums[self.obSeq % 256] and (self.outSeq - self.obSeq) % 2**16 < 256: + return (1, 0) # do we explicitly need to send a response? elif self.response: self.response = 0 - return 1 + return (1, 0) # have we not sent anything in a while? elif time() - self.lastTransmit > 1.0: - return 1 - + return (1, 1) + elif self.state == pending: + return (1, 1) + # nothing to send - return 0 + return (0, 0) def schedule(self): - if self.timeToSend(): - reactor.callLater(0, self.sendNext) - else: - reactor.callLater(1, self.sendNext) + tts, t = self.timeToSend() + if tts and not self.scheduled: + self.scheduled = 1 + reactor.callLater(t, self.sendNext) def write(self, data): # micropackets can only be 255 bytes or less diff --git a/const.py b/const.py index d81ecbd..ee1da1f 100644 --- a/const.py +++ b/const.py @@ -8,7 +8,7 @@ main.installReactor(reactor) NULL_ID = 20 * '\0' # Kademlia "K" constant, this should be an even number -K = 8 +K = 20 # SHA1 is 160 bits long HASH_LENGTH = 160 diff --git a/hash.py b/hash.py index 4de069f..2a31246 100644 --- a/hash.py +++ b/hash.py @@ -3,6 +3,8 @@ from sha import sha import whrandom +random = open('/dev/urandom', 'r') # sucks for windoze + def intify(hstr): """20 bit hash, big-endian -> long python integer""" assert len(hstr) == 20 @@ -26,8 +28,7 @@ def distance(a, b): def newID(): """returns a new pseudorandom globally unique ID string""" h = sha() - for i in range(20): - h.update(chr(whrandom.randrange(0,256))) + h.update(random.read(20)) return h.digest() def newIDInRange(min, max): @@ -35,7 +36,7 @@ def newIDInRange(min, max): def randRange(min, max): return min + intify(newID()) % (max - min) - + ### Test Cases ### import unittest @@ -49,7 +50,7 @@ class NewID(unittest.TestCase): class Intify(unittest.TestCase): known = [('\0' * 20, 0), - ('\xff' * 20, 2L**160 - 1), + ('\xff' * 20, 2L**160 - 1), ] def testKnown(self): for str, value in self.known: diff --git a/khashmir.py b/khashmir.py index 3ca5d72..f3e410a 100644 --- a/khashmir.py +++ b/khashmir.py @@ -12,332 +12,325 @@ from knode import KNode as Node from hash import newID, newIDInRange -from actions import FindNode, GetValue, KeyExpirer -from twisted.web import xmlrpc +from actions import FindNode, GetValue, KeyExpirer, StoreValue +import krpc +import airhook + from twisted.internet.defer import Deferred +from twisted.internet import protocol from twisted.python import threadable from twisted.internet.app import Application from twisted.web import server threadable.init() +import sys import sqlite ## find this at http://pysqlite.sourceforge.net/ import pysqlite_exceptions -KhashmirDBExcept = "KhashmirDBExcept" +class KhashmirDBExcept(Exception): + pass # this is the main class! -class Khashmir(xmlrpc.XMLRPC): - __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last') - def __init__(self, host, port, db='khashmir.db'): - self.setup(host, port, db) - - def setup(self, host, port, db='khashmir.db'): - self._findDB(db) - self.node = self._loadSelfNode(host, port) - self.table = KTable(self.node) - self._loadRoutingTable() - self.app = Application("xmlrpc") - self.app.listenTCP(port, server.Site(self)) - self.last = time.time() - KeyExpirer(store=self.store) - #self.refreshTable(force=1) - reactor.callLater(60, self.checkpoint, (1,)) - - def _loadSelfNode(self, host, port): - c = self.store.cursor() - c.execute('select id from self where num = 0;') - if c.rowcount > 0: - id = c.fetchone()[0].decode('base64') - else: - id = newID() - return Node().init(id, host, port) - - def _saveSelfNode(self): - self.store.autocommit = 0 - c = self.store.cursor() - c.execute('delete from self where num = 0;') - c.execute("insert into self values (0, '%s');" % self.node.id.encode('base64')) - self.store.commit() - self.store.autocommit = 1 - - def checkpoint(self, auto=0): - self._saveSelfNode() - self._dumpRoutingTable() - if auto: - reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint) - - def _findDB(self, db): - import os - try: - os.stat(db) - except OSError: - self._createNewDB(db) - else: - self._loadDB(db) - - def _loadDB(self, db): - try: - self.store = sqlite.connect(db=db) - self.store.autocommit = 1 - except: - import traceback - raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback - - def _createNewDB(self, db): - self.store = sqlite.connect(db=db) - self.store.autocommit = 1 - s = """ - create table kv (key text, value text, time timestamp, primary key (key, value)); - create index kv_key on kv(key); - create index kv_timestamp on kv(time); - - create table nodes (id text primary key, host text, port number); - - create table self (num number primary key, id text); - """ - c = self.store.cursor() - c.execute(s) - - def _dumpRoutingTable(self): - """ - save routing table nodes to the database - """ - self.store.autocommit = 0; - c = self.store.cursor() - c.execute("delete from nodes where id not NULL;") - for bucket in self.table.buckets: - for node in bucket.l: - d = node.senderDict() - c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'], d['host'], d['port'])) - self.store.commit() - self.store.autocommit = 1; - - def _loadRoutingTable(self): - """ - load routing table nodes from database - it's usually a good idea to call refreshTable(force=1) after loading the table - """ - c = self.store.cursor() - c.execute("select * from nodes;") - for rec in c.fetchall(): - n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) - self.table.insertNode(n, contacted=0) - - def render(self, request): - """ - Override the built in render so we can have access to the request object! - note, crequest is probably only valid on the initial call (not after deferred!) - """ - self.crequest = request - return xmlrpc.XMLRPC.render(self, request) +class Khashmir(protocol.Factory): + __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol') + protocol = krpc.KRPC + def __init__(self, host, port, db='khashmir.db'): + self.setup(host, port, db) + + def setup(self, host, port, db='khashmir.db'): + self._findDB(db) + self.node = self._loadSelfNode(host, port) + self.table = KTable(self.node) + self.app = Application("krpc") + self.airhook = airhook.listenAirhookStream(port, self) + self.last = time.time() + self._loadRoutingTable() + KeyExpirer(store=self.store) + #self.refreshTable(force=1) + reactor.callLater(60, self.checkpoint, (1,)) + + def _loadSelfNode(self, host, port): + c = self.store.cursor() + c.execute('select id from self where num = 0;') + if c.rowcount > 0: + id = c.fetchone()[0].decode('hex') + else: + id = newID() + return Node().init(id, host, port) + + def _saveSelfNode(self): + self.store.autocommit = 0 + c = self.store.cursor() + c.execute('delete from self where num = 0;') + c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex')) + self.store.commit() + self.store.autocommit = 1 + + def checkpoint(self, auto=0): + self._saveSelfNode() + self._dumpRoutingTable() + if auto: + reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint) + + def _findDB(self, db): + import os + try: + os.stat(db) + except OSError: + self._createNewDB(db) + else: + self._loadDB(db) + + def _loadDB(self, db): + try: + self.store = sqlite.connect(db=db) + self.store.autocommit = 1 + except: + import traceback + raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback + + def _createNewDB(self, db): + self.store = sqlite.connect(db=db) + self.store.autocommit = 1 + s = """ + create table kv (key text, value text, time timestamp, primary key (key, value)); + create index kv_key on kv(key); + create index kv_timestamp on kv(time); + + create table nodes (id text primary key, host text, port number); + + create table self (num number primary key, id text); + """ + c = self.store.cursor() + c.execute(s) + def _dumpRoutingTable(self): + """ + save routing table nodes to the database + """ + self.store.autocommit = 0; + c = self.store.cursor() + c.execute("delete from nodes where id not NULL;") + for bucket in self.table.buckets: + for node in bucket.l: + d = node.senderDict() + c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port'])) + self.store.commit() + self.store.autocommit = 1; + + def _loadRoutingTable(self): + """ + load routing table nodes from database + it's usually a good idea to call refreshTable(force=1) after loading the table + """ + c = self.store.cursor() + c.execute("select * from nodes;") + for rec in c.fetchall(): + n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])}) + n.conn = self.airhook.connectionForAddr((n.host, n.port)) + self.table.insertNode(n, contacted=0) + - ####### - ####### LOCAL INTERFACE - use these methods! - def addContact(self, host, port): - """ - ping this node and add the contact info to the table on pong! - """ - n =Node().init(const.NULL_ID, host, port) # note, we - self.sendPing(n) + ####### + ####### LOCAL INTERFACE - use these methods! + def addContact(self, host, port, callback=None): + """ + ping this node and add the contact info to the table on pong! + """ + n =Node().init(const.NULL_ID, host, port) + n.conn = self.airhook.connectionForAddr((n.host, n.port)) + self.sendPing(n, callback=callback) - ## this call is async! - def findNode(self, id, callback, errback=None): - """ returns the contact info for node, or the k closest nodes, from the global table """ - # get K nodes out of local table/cache, or the node we want - nodes = self.table.findNodes(id) - d = Deferred() - if errback: - d.addCallbacks(callback, errback) - else: - d.addCallback(callback) - if len(nodes) == 1 and nodes[0].id == id : - d.callback(nodes) - else: - # create our search state - state = FindNode(self, id, d.callback) - reactor.callFromThread(state.goWithNodes, nodes) - - - ## also async - def valueForKey(self, key, callback): - """ returns the values found for key in global table - callback will be called with a list of values for each peer that returns unique values - final callback will be an empty list - probably should change to 'more coming' arg - """ - nodes = self.table.findNodes(key) - - # get locals - l = self.retrieveValues(key) - if len(l) > 0: - reactor.callFromThread(callback, map(lambda a: a.decode('base64'), l)) - - # create our search state - state = GetValue(self, key, callback) - reactor.callFromThread(state.goWithNodes, nodes, l) + ## this call is async! + def findNode(self, id, callback, errback=None): + """ returns the contact info for node, or the k closest nodes, from the global table """ + # get K nodes out of local table/cache, or the node we want + nodes = self.table.findNodes(id) + d = Deferred() + if errback: + d.addCallbacks(callback, errback) + else: + d.addCallback(callback) + if len(nodes) == 1 and nodes[0].id == id : + d.callback(nodes) + else: + # create our search state + state = FindNode(self, id, d.callback) + reactor.callFromThread(state.goWithNodes, nodes) + + + ## also async + def valueForKey(self, key, callback): + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg + """ + nodes = self.table.findNodes(key) + + # get locals + l = self.retrieveValues(key) + + # create our search state + state = GetValue(self, key, callback) + reactor.callFromThread(state.goWithNodes, nodes, l) - ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) - def storeValueForKey(self, key, value, callback=None): - """ stores the value for key in the global table, returns immediately, no status - in this implementation, peers respond but don't indicate status to storing values - a key can have many values - """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): - if not response: - # default callback - def _storedValueHandler(sender): - pass - response=_storedValueHandler - - for node in nodes[:const.STORE_REDUNDANCY]: - def cb(t, table = table, node=node, resp=response): - self.table.insertNode(node) - response(t) - if node.id != self.node.id: - def default(err, node=node, table=table): - table.nodeFailed(node) - df = node.storeValue(key, value, self.node.senderDict()) - df.addCallbacks(cb, default) - # this call is asynch - self.findNode(key, _storeValueForKey) - - - def insertNode(self, n, contacted=1): - """ - insert a node in our local table, pinging oldest contact in bucket, if necessary - - If all you have is a host/port, then use addContact, which calls this method after - receiving the PONG from the remote node. The reason for the seperation is we can't insert - a node into the table without it's peer-ID. That means of course the node passed into this - method needs to be a properly formed Node object with a valid ID. - """ - old = self.table.insertNode(n, contacted=contacted) - if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: - # the bucket is full, check to see if old node is still around and if so, replace it - - ## these are the callbacks used when we ping the oldest node in a bucket - def _staleNodeHandler(oldnode=old, newnode = n): - """ called if the pinged node never responds """ - self.table.replaceStaleNode(old, newnode) - - def _notStaleNodeHandler(sender, old=old): - """ called when we get a pong from the old node """ - args, sender = sender - sender = Node().initWithDict(sender) - if sender.id == old.id: - self.table.justSeenNode(old) - - df = old.ping(self.node.senderDict()) - df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) + ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) + def storeValueForKey(self, key, value, callback=None): + """ stores the value for key in the global table, returns immediately, no status + in this implementation, peers respond but don't indicate status to storing values + a key can have many values + """ + def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + if not response: + # default callback + def _storedValueHandler(sender): + pass + response=_storedValueHandler + action = StoreValue(self.table, key, value, response) + reactor.callFromThread(action.goWithNodes, nodes) + + # this call is asynch + self.findNode(key, _storeValueForKey) + + + def insertNode(self, n, contacted=1): + """ + insert a node in our local table, pinging oldest contact in bucket, if necessary + + If all you have is a host/port, then use addContact, which calls this method after + receiving the PONG from the remote node. The reason for the seperation is we can't insert + a node into the table without it's peer-ID. That means of course the node passed into this + method needs to be a properly formed Node object with a valid ID. + """ + old = self.table.insertNode(n, contacted=contacted) + if old and (time.time() - old.lastSeen) > const.MIN_PING_INTERVAL and old.id != self.node.id: + # the bucket is full, check to see if old node is still around and if so, replace it + + ## these are the callbacks used when we ping the oldest node in a bucket + def _staleNodeHandler(oldnode=old, newnode = n): + """ called if the pinged node never responds """ + self.table.replaceStaleNode(old, newnode) + + def _notStaleNodeHandler(dict, old=old): + """ called when we get a pong from the old node """ + sender = dict['sender'] + if sender['id'] == old.id: + self.table.justSeenNode(old.id) + + df = old.ping(self.node.senderDict()) + df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) - def sendPing(self, node): - """ - ping a node - """ - df = node.ping(self.node.senderDict()) - ## these are the callbacks we use when we issue a PING - def _pongHandler(args, node=node, table=self.table): - l, sender = args - if node.id != const.NULL_ID and node.id != sender['id'].decode('base64'): - # whoah, got response from different peer than we were expecting - self.table.invalidateNode(node) - else: - sender['host'] = node.host - sender['port'] = node.port - n = Node().initWithDict(sender) - table.insertNode(n) - return - def _defaultPong(err, node=node, table=self.table): - table.nodeFailed(node) - - df.addCallbacks(_pongHandler,_defaultPong) + def sendPing(self, node, callback=None): + """ + ping a node + """ + df = node.ping(self.node.senderDict()) + ## these are the callbacks we use when we issue a PING + def _pongHandler(dict, node=node, table=self.table, callback=callback): + sender = dict['sender'] + if node.id != const.NULL_ID and node.id != sender['id']: + # whoah, got response from different peer than we were expecting + self.table.invalidateNode(node) + else: + sender['host'] = node.host + sender['port'] = node.port + n = Node().initWithDict(sender) + n.conn = self.airhook.connectionForAddr((n.host, n.port)) + table.insertNode(n) + if callback: + callback() + def _defaultPong(err, node=node, table=self.table, callback=callback): + table.nodeFailed(node) + if callback: + callback() + + df.addCallbacks(_pongHandler,_defaultPong) - def findCloseNodes(self, callback=lambda a: None): - """ - This does a findNode on the ID one away from our own. - This will allow us to populate our table with nodes on our network closest to our own. - This is called as soon as we start up with an empty table - """ - id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) - self.findNode(id, callback) + def findCloseNodes(self, callback=lambda a: None): + """ + This does a findNode on the ID one away from our own. + This will allow us to populate our table with nodes on our network closest to our own. + This is called as soon as we start up with an empty table + """ + id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) + self.findNode(id, callback) - def refreshTable(self, force=0): - """ - force=1 will refresh table regardless of last bucket access time - """ - def callback(nodes): - pass - - for bucket in self.table.buckets: - if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS): - id = newIDInRange(bucket.min, bucket.max) - self.findNode(id, callback) + def refreshTable(self, force=0): + """ + force=1 will refresh table regardless of last bucket access time + """ + def callback(nodes): + pass + + for bucket in self.table.buckets: + if force or (time.time() - bucket.lastAccessed >= const.BUCKET_STALENESS): + id = newIDInRange(bucket.min, bucket.max) + self.findNode(id, callback) - def retrieveValues(self, key): - s = "select value from kv where key = '%s';" % key.encode('base64') - c = self.store.cursor() - c.execute(s) - t = c.fetchone() - l = [] - while t: - l.append(t['value']) - t = c.fetchone() - return l - - ##### - ##### INCOMING MESSAGE HANDLERS - - def xmlrpc_ping(self, sender): - """ - takes sender dict = {'id', , 'port', port} optional keys = 'ip' - returns sender dict - """ - ip = self.crequest.getClientIP() - sender['host'] = ip - n = Node().initWithDict(sender) - self.insertNode(n, contacted=0) - return (), self.node.senderDict() - - def xmlrpc_find_node(self, target, sender): - nodes = self.table.findNodes(target.decode('base64')) - nodes = map(lambda node: node.senderDict(), nodes) - ip = self.crequest.getClientIP() - sender['host'] = ip - n = Node().initWithDict(sender) - self.insertNode(n, contacted=0) - return nodes, self.node.senderDict() - - def xmlrpc_store_value(self, key, value, sender): - t = "%0.6f" % time.time() - s = "insert into kv values ('%s', '%s', '%s');" % (key, value, t) - c = self.store.cursor() - try: - c.execute(s) - except pysqlite_exceptions.IntegrityError, reason: - # update last insert time - s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key, value) - c.execute(s) - ip = self.crequest.getClientIP() - sender['host'] = ip - n = Node().initWithDict(sender) - self.insertNode(n, contacted=0) - return (), self.node.senderDict() - - def xmlrpc_find_value(self, key, sender): - ip = self.crequest.getClientIP() - key = key.decode('base64') - sender['host'] = ip - n = Node().initWithDict(sender) - self.insertNode(n, contacted=0) - - l = self.retrieveValues(key) - if len(l) > 0: - return {'values' : l}, self.node.senderDict() - else: - nodes = self.table.findNodes(key) - nodes = map(lambda node: node.senderDict(), nodes) - return {'nodes' : nodes}, self.node.senderDict() + def retrieveValues(self, key): + s = "select value from kv where key = '%s';" % key.encode('hex') + c = self.store.cursor() + c.execute(s) + t = c.fetchone() + l = [] + while t: + l.append(t['value'].decode('base64')) + t = c.fetchone() + return l + + ##### + ##### INCOMING MESSAGE HANDLERS + + def krpc_ping(self, sender, _krpc_sender): + """ + takes sender dict = {'id', , 'port', port} optional keys = 'ip' + returns sender dict + """ + sender['host'] = _krpc_sender[0] + n = Node().initWithDict(sender) + n.conn = self.airhook.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"sender" : self.node.senderDict()} + + def krpc_find_node(self, target, sender, _krpc_sender): + nodes = self.table.findNodes(target) + nodes = map(lambda node: node.senderDict(), nodes) + sender['host'] = _krpc_sender[0] + n = Node().initWithDict(sender) + n.conn = self.airhook.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"nodes" : nodes, "sender" : self.node.senderDict()} + + def krpc_store_value(self, key, value, sender, _krpc_sender): + t = "%0.6f" % time.time() + s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t) + c = self.store.cursor() + try: + c.execute(s) + except pysqlite_exceptions.IntegrityError, reason: + # update last insert time + s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64")) + c.execute(s) + sender['host'] = _krpc_sender[0] + n = Node().initWithDict(sender) + n.conn = self.airhook.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"sender" : self.node.senderDict()} + + def krpc_find_value(self, key, sender, _krpc_sender): + sender['host'] = _krpc_sender[0] + n = Node().initWithDict(sender) + n.conn = self.airhook.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + + l = self.retrieveValues(key) + if len(l) > 0: + return {'values' : l, "sender": self.node.senderDict()} + else: + nodes = self.table.findNodes(key) + nodes = map(lambda node: node.senderDict(), nodes) + return {'nodes' : nodes, "sender": self.node.senderDict()} ### TESTING ### from random import randrange @@ -347,129 +340,151 @@ from hash import newID def test_net(peers=24, startport=2001, dbprefix='/tmp/test'): - import thread - l = [] - for i in xrange(peers): - a = Khashmir('localhost', startport + i, db = dbprefix+`i`) - l.append(a) - thread.start_new_thread(l[0].app.run, ()) - for peer in l[1:]: - peer.app.run() - return l - -def test_build_net(quiet=0, peers=24, host='localhost', pause=0, startport=2001, dbprefix='/tmp/test'): - from whrandom import randrange - import threading - import thread - import sys - port = startport - l = [] - if not quiet: - print "Building %s peer table." % peers - - for i in xrange(peers): - a = Khashmir(host, port + i, db = dbprefix +`i`) - l.append(a) - - - thread.start_new_thread(l[0].app.run, ()) - time.sleep(1) - for peer in l[1:]: - peer.app.run() - time.sleep(3) - - print "adding contacts...." - - for peer in l: - n = l[randrange(0, len(l))].node - peer.addContact(host, n.port) - n = l[randrange(0, len(l))].node - peer.addContact(host, n.port) - n = l[randrange(0, len(l))].node - peer.addContact(host, n.port) - if pause: - time.sleep(.33) - - time.sleep(10) - print "finding close nodes...." - - for peer in l: - flag = threading.Event() - def cb(nodes, f=flag): - f.set() - peer.findCloseNodes(cb) - flag.wait() - # for peer in l: - # peer.refreshTable() - return l + import thread + l = [] + for i in xrange(peers): + a = Khashmir('127.0.0.1', startport + i, db = dbprefix+`i`) + l.append(a) + thread.start_new_thread(l[0].app.run, ()) + for peer in l[1:]: + peer.app.run() + return l + +def test_build_net(quiet=0, peers=24, host='127.0.0.1', pause=0, startport=2001, dbprefix='/tmp/test'): + from whrandom import randrange + import threading + import thread + import sys + port = startport + l = [] + if not quiet: + print "Building %s peer table." % peers + + for i in xrange(peers): + a = Khashmir(host, port + i, db = dbprefix +`i`) + l.append(a) + + + thread.start_new_thread(l[0].app.run, ()) + time.sleep(1) + for peer in l[1:]: + peer.app.run() + #time.sleep(3) + + def spewer(frame, s, ignored): + from twisted.python import reflect + if frame.f_locals.has_key('self'): + se = frame.f_locals['self'] + print 'method %s of %s at %s' % ( + frame.f_code.co_name, reflect.qual(se.__class__), id(se) + ) + #sys.settrace(spewer) + + print "adding contacts...." + def makecb(flag): + def cb(f=flag): + f.set() + return cb + + for peer in l: + p = l[randrange(0, len(l))] + if p != peer: + n = p.node + flag = threading.Event() + peer.addContact(host, n.port, makecb(flag)) + flag.wait() + p = l[randrange(0, len(l))] + if p != peer: + n = p.node + flag = threading.Event() + peer.addContact(host, n.port, makecb(flag)) + flag.wait() + p = l[randrange(0, len(l))] + if p != peer: + n = p.node + flag = threading.Event() + peer.addContact(host, n.port, makecb(flag)) + flag.wait() + + print "finding close nodes...." + + for peer in l: + flag = threading.Event() + def cb(nodes, f=flag): + f.set() + peer.findCloseNodes(cb) + flag.wait() + # for peer in l: + # peer.refreshTable() + return l def test_find_nodes(l, quiet=0): - flag = threading.Event() - - n = len(l) - - a = l[randrange(0,n)] - b = l[randrange(0,n)] - - def callback(nodes, flag=flag, id = b.node.id): - if (len(nodes) >0) and (nodes[0].id == id): - print "test_find_nodes PASSED" - else: - print "test_find_nodes FAILED" - flag.set() - a.findNode(b.node.id, callback) - flag.wait() + flag = threading.Event() + + n = len(l) + + a = l[randrange(0,n)] + b = l[randrange(0,n)] + + def callback(nodes, flag=flag, id = b.node.id): + if (len(nodes) >0) and (nodes[0].id == id): + print "test_find_nodes PASSED" + else: + print "test_find_nodes FAILED" + flag.set() + a.findNode(b.node.id, callback) + flag.wait() def test_find_value(l, quiet=0): - - fa = threading.Event() - fb = threading.Event() - fc = threading.Event() - - n = len(l) - a = l[randrange(0,n)] - b = l[randrange(0,n)] - c = l[randrange(0,n)] - d = l[randrange(0,n)] - - key = newID() - value = newID() - if not quiet: print "inserting value..." - a.storeValueForKey(key, value) - time.sleep(3) - if not quiet: - print "finding..." - - class cb: - def __init__(self, flag, value=value): - self.flag = flag - self.val = value - self.found = 0 - def callback(self, values): - try: - if(len(values) == 0): - if not self.found: - print "find NOT FOUND" - else: - print "find FOUND" - else: - if self.val in values: - self.found = 1 - finally: - self.flag.set() - - b.valueForKey(key, cb(fa).callback) - fa.wait() - c.valueForKey(key, cb(fb).callback) - fb.wait() - d.valueForKey(key, cb(fc).callback) - fc.wait() + + fa = threading.Event() + fb = threading.Event() + fc = threading.Event() + + n = len(l) + a = l[randrange(0,n)] + b = l[randrange(0,n)] + c = l[randrange(0,n)] + d = l[randrange(0,n)] + + key = newID() + value = newID() + if not quiet: print "inserting value..." + a.storeValueForKey(key, value) + time.sleep(3) + if not quiet: + print "finding..." + + class cb: + def __init__(self, flag, value=value): + self.flag = flag + self.val = value + self.found = 0 + def callback(self, values): + try: + if(len(values) == 0): + if not self.found: + print "find NOT FOUND" + else: + print "find FOUND" + else: + if self.val in values: + self.found = 1 + finally: + self.flag.set() + + b.valueForKey(key, cb(fa).callback) + fa.wait() + c.valueForKey(key, cb(fb).callback) + fb.wait() + d.valueForKey(key, cb(fc).callback) + fc.wait() def test_one(host, port, db='/tmp/test'): - import thread - k = Khashmir(host, port, db) - thread.start_new_thread(k.app.run, ()) - return k + import thread + k = Khashmir(host, port, db) + thread.start_new_thread(reactor.run, ()) + return k if __name__ == "__main__": import sys @@ -479,7 +494,7 @@ if __name__ == "__main__": time.sleep(3) print "finding nodes..." for i in range(10): - test_find_nodes(l) + test_find_nodes(l) print "inserting and fetching values..." for i in range(10): - test_find_value(l) + test_find_value(l) diff --git a/knode.py b/knode.py index ef82260..b5ecedc 100644 --- a/knode.py +++ b/knode.py @@ -1,39 +1,27 @@ from node import Node from twisted.internet.defer import Deferred -from xmlrpcclient import XMLRPCClientFactory as factory from const import reactor, NULL_ID class KNode(Node): - def makeResponse(self, df): - """ Make our callback cover that checks to make sure the id of the response is the same as what we are expecting """ - def _callback(args, d=df): - try: - l, sender = args - except: - d.callback(args) - else: - if self.id != NULL_ID and sender['id'] != self._senderDict['id']: - d.errback() - else: - d.callback(args) - return _callback - def ping(self, sender): - df = Deferred() - f = factory('ping', (sender,), self.makeResponse(df), df.errback) - reactor.connectTCP(self.host, self.port, f) - return df - def findNode(self, target, sender): - df = Deferred() - f = factory('find_node', (target.encode('base64'), sender), self.makeResponse(df), df.errback) - reactor.connectTCP(self.host, self.port, f) - return df - def storeValue(self, key, value, sender): - df = Deferred() - f = factory('store_value', (key.encode('base64'), value.encode('base64'), sender), self.makeResponse(df), df.errback) - reactor.connectTCP(self.host, self.port, f) - return df - def findValue(self, key, sender): - df = Deferred() - f = factory('find_value', (key.encode('base64'), sender), self.makeResponse(df), df.errback) - reactor.connectTCP(self.host, self.port, f) - return df + def makeResponse(self, df): + """ Make our callback cover that checks to make sure the id of the response is the same as what we are expecting """ + def _callback(dict, d=df): + try: + senderid = dict['sender']['id'] + except KeyError: + d.errback() + else: + if self.id != NULL_ID and senderid != self._senderDict['id']: + d.errback() + else: + d.callback(dict) + return _callback + + def ping(self, sender): + return self.conn.protocol.sendRequest('ping', {"sender":sender}) + def findNode(self, target, sender): + return self.conn.protocol.sendRequest('find_node', {"target" : target, "sender": sender}) + def storeValue(self, key, value, sender): + return self.conn.protocol.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender}) + def findValue(self, key, sender): + return self.conn.protocol.sendRequest('find_value', {"key" : key, "sender" : sender}) \ No newline at end of file diff --git a/krpc.py b/krpc.py new file mode 100644 index 0000000..8569bbf --- /dev/null +++ b/krpc.py @@ -0,0 +1,101 @@ +import airhook +from twisted.internet.defer import Deferred +from twisted.protocols import basic +from bencode import bencode, bdecode +from twisted.internet import reactor + +import hash + +KRPC_TIMEOUT = 30 + +KRPC_ERROR = 1 +KRPC_ERROR_METHOD_UNKNOWN = 2 +KRPC_ERROR_RECEIVED_UNKNOWN = 3 +KRPC_ERROR_TIMEOUT = 4 + +class KRPC(basic.NetstringReceiver): + noisy = 1 + def __init__(self): + self.tids = {} + + def stringReceived(self, str): + # bdecode + try: + msg = bdecode(str) + except Exception, e: + print "response decode error: " + `e` + self.d.errback() + else: + # look at msg type + if msg['typ'] == 'req': + ilen = len(str) + # if request + # tell factory to handle + f = getattr(self.factory ,"krpc_" + msg['req'], None) + if f and callable(f): + msg['arg']['_krpc_sender'] = self.transport.addr + try: + ret = apply(f, (), msg['arg']) + except Exception, e: + ## send error + str = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`}) + olen = len(str) + self.sendString(str) + else: + if ret: + # make response + str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret}) + else: + str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : []}) + # send response + olen = len(str) + self.sendString(str) + + else: + # unknown method + str = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN}) + olen = len(str) + self.sendString(str) + if self.noisy: + print "%s >>> (%s, %s) - %s %s %s" % (self.transport.addr, self.factory.node.host, self.factory.node.port, + ilen, msg['req'], olen) + elif msg['typ'] == 'rsp': + # if response + # lookup tid + if self.tids.has_key(msg['tid']): + df = self.tids[msg['tid']] + # callback + df.callback(msg['rsp']) + del(self.tids[msg['tid']]) + # no tid, perhaps this transaction timed out already... + elif msg['typ'] == 'err': + # if error + # lookup tid + df = self.tids[msg['tid']] + # callback + df.errback(msg['err']) + del(self.tids[msg['tid']]) + else: + # unknown message type + df = self.tids[msg['tid']] + # callback + df.errback(KRPC_ERROR_RECEIVED_UNKNOWN) + del(self.tids[msg['tid']]) + + def sendRequest(self, method, args): + # make message + # send it + msg = {'tid' : hash.newID(), 'typ' : 'req', 'req' : method, 'arg' : args} + str = bencode(msg) + self.sendString(str) + d = Deferred() + self.tids[msg['tid']] = d + + def timeOut(tids = self.tids, id = msg['tid']): + if tids.has_key(id): + df = tids[id] + del(tids[id]) + df.errback(KRPC_ERROR_TIMEOUT) + reactor.callLater(KRPC_TIMEOUT, timeOut) + return d + \ No newline at end of file diff --git a/ktable.py b/ktable.py index bd89ef7..05bd51c 100644 --- a/ktable.py +++ b/ktable.py @@ -6,207 +6,207 @@ from types import * import hash import const -from const import K, HASH_LENGTH +from const import K, HASH_LENGTH, NULL_ID from node import Node class KTable: - """local routing table for a kademlia like distributed hash table""" - def __init__(self, node): - # this is the root node, a.k.a. US! - self.node = node - self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)] - self.insertNode(node) - - def _bucketIndexForInt(self, num): - """the index of the bucket that should hold int""" - return bisect_left(self.buckets, num) - - def findNodes(self, id): - """ - return K nodes in our own local table closest to the ID. - """ - - if isinstance(id, str): - num = hash.intify(id) - elif isinstance(id, Node): - num = id.num - elif isinstance(id, int) or isinstance(id, long): - num = id - else: - raise TypeError, "findNodes requires an int, string, or Node" - - nodes = [] - i = self._bucketIndexForInt(num) - - # if this node is already in our table then return it - try: - index = self.buckets[i].l.index(num) - except ValueError: - pass - else: - return [self.buckets[i].l[index]] - - # don't have the node, get the K closest nodes - nodes = nodes + self.buckets[i].l - if len(nodes) < K: - # need more nodes - min = i - 1 - max = i + 1 - while len(nodes) < K and (min >= 0 or max < len(self.buckets)): - #ASw: note that this requires K be even - if min >= 0: - nodes = nodes + self.buckets[min].l - if max < len(self.buckets): - nodes = nodes + self.buckets[max].l - min = min - 1 - max = max + 1 - - nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num)) - return nodes[:K] - - def _splitBucket(self, a): - diff = (a.max - a.min) / 2 - b = KBucket([], a.max - diff, a.max) - self.buckets.insert(self.buckets.index(a.min) + 1, b) - a.max = a.max - diff - # transfer nodes to new bucket - for anode in a.l[:]: - if anode.num >= a.max: - a.l.remove(anode) - b.l.append(anode) - - def replaceStaleNode(self, stale, new): - """this is used by clients to replace a node returned by insertNode after - it fails to respond to a Pong message""" - i = self._bucketIndexForInt(stale.num) - try: - it = self.buckets[i].l.index(stale.num) - except ValueError: - return - - del(self.buckets[i].l[it]) - if new: - self.buckets[i].l.append(new) - - def insertNode(self, node, contacted=1): - """ - this insert the node, returning None if successful, returns the oldest node in the bucket if it's full - the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!! - contacted means that yes, we contacted THEM and we know the node is reachable - """ - assert node.id != " "*20 - if node.id == self.node.id: return - # get the bucket for this node - i = self. _bucketIndexForInt(node.num) - # check to see if node is in the bucket already - try: - it = self.buckets[i].l.index(node.num) - except ValueError: - # no - pass - else: - if contacted: - node.updateLastSeen() - # move node to end of bucket - xnode = self.buckets[i].l[it] - del(self.buckets[i].l[it]) - # note that we removed the original and replaced it with the new one - # utilizing this nodes new contact info - self.buckets[i].l.append(xnode) - self.buckets[i].touch() - return - - # we don't have this node, check to see if the bucket is full - if len(self.buckets[i].l) < K: - # no, append this node and return - if contacted: - node.updateLastSeen() - self.buckets[i].l.append(node) - self.buckets[i].touch() - return - - # bucket is full, check to see if self.node is in the bucket - if not (self.buckets[i].min <= self.node < self.buckets[i].max): - return self.buckets[i].l[0] - - # this bucket is full and contains our node, split the bucket - if len(self.buckets) >= HASH_LENGTH: - # our table is FULL, this is really unlikely - print "Hash Table is FULL! Increase K!" - return - - self._splitBucket(self.buckets[i]) - - # now that the bucket is split and balanced, try to insert the node again - return self.insertNode(node) - - def justSeenNode(self, node): - """call this any time you get a message from a node - it will update it in the table if it's there """ - try: - n = self.findNodes(node.num)[0] - except IndexError: - return None - else: - tstamp = n.lastSeen - n.updateLastSeen() - return tstamp - - def invalidateNode(self, n): - """ - forget about node n - use when you know that node is invalid - """ - self.replaceStaleNode(n, None) - - def nodeFailed(self, node): - """ call this when a node fails to respond to a message, to invalidate that node """ - try: - n = self.findNodes(node.num)[0] - except IndexError: - return None - else: - if n.msgFailed() >= const.MAX_FAILURES: - self.invalidateNode(n) - + """local routing table for a kademlia like distributed hash table""" + def __init__(self, node): + # this is the root node, a.k.a. US! + self.node = node + self.buckets = [KBucket([], 0L, 2L**HASH_LENGTH)] + self.insertNode(node) + + def _bucketIndexForInt(self, num): + """the index of the bucket that should hold int""" + return bisect_left(self.buckets, num) + + def findNodes(self, id): + """ + return K nodes in our own local table closest to the ID. + """ + + if isinstance(id, str): + num = hash.intify(id) + elif isinstance(id, Node): + num = id.num + elif isinstance(id, int) or isinstance(id, long): + num = id + else: + raise TypeError, "findNodes requires an int, string, or Node" + + nodes = [] + i = self._bucketIndexForInt(num) + + # if this node is already in our table then return it + try: + index = self.buckets[i].l.index(num) + except ValueError: + pass + else: + return [self.buckets[i].l[index]] + + # don't have the node, get the K closest nodes + nodes = nodes + self.buckets[i].l + if len(nodes) < K: + # need more nodes + min = i - 1 + max = i + 1 + while len(nodes) < K and (min >= 0 or max < len(self.buckets)): + #ASw: note that this requires K be even + if min >= 0: + nodes = nodes + self.buckets[min].l + if max < len(self.buckets): + nodes = nodes + self.buckets[max].l + min = min - 1 + max = max + 1 + + nodes.sort(lambda a, b, num=num: cmp(num ^ a.num, num ^ b.num)) + return nodes[:K] + + def _splitBucket(self, a): + diff = (a.max - a.min) / 2 + b = KBucket([], a.max - diff, a.max) + self.buckets.insert(self.buckets.index(a.min) + 1, b) + a.max = a.max - diff + # transfer nodes to new bucket + for anode in a.l[:]: + if anode.num >= a.max: + a.l.remove(anode) + b.l.append(anode) + + def replaceStaleNode(self, stale, new): + """this is used by clients to replace a node returned by insertNode after + it fails to respond to a Pong message""" + i = self._bucketIndexForInt(stale.num) + try: + it = self.buckets[i].l.index(stale.num) + except ValueError: + return + + del(self.buckets[i].l[it]) + if new: + self.buckets[i].l.append(new) + + def insertNode(self, node, contacted=1): + """ + this insert the node, returning None if successful, returns the oldest node in the bucket if it's full + the caller responsible for pinging the returned node and calling replaceStaleNode if it is found to be stale!! + contacted means that yes, we contacted THEM and we know the node is reachable + """ + assert node.id != NULL_ID + if node.id == self.node.id: return + # get the bucket for this node + i = self. _bucketIndexForInt(node.num) + # check to see if node is in the bucket already + try: + it = self.buckets[i].l.index(node.num) + except ValueError: + # no + pass + else: + if contacted: + node.updateLastSeen() + # move node to end of bucket + xnode = self.buckets[i].l[it] + del(self.buckets[i].l[it]) + # note that we removed the original and replaced it with the new one + # utilizing this nodes new contact info + self.buckets[i].l.append(xnode) + self.buckets[i].touch() + return + + # we don't have this node, check to see if the bucket is full + if len(self.buckets[i].l) < K: + # no, append this node and return + if contacted: + node.updateLastSeen() + self.buckets[i].l.append(node) + self.buckets[i].touch() + return + + # bucket is full, check to see if self.node is in the bucket + if not (self.buckets[i].min <= self.node < self.buckets[i].max): + return self.buckets[i].l[0] + + # this bucket is full and contains our node, split the bucket + if len(self.buckets) >= HASH_LENGTH: + # our table is FULL, this is really unlikely + print "Hash Table is FULL! Increase K!" + return + + self._splitBucket(self.buckets[i]) + + # now that the bucket is split and balanced, try to insert the node again + return self.insertNode(node) + + def justSeenNode(self, id): + """call this any time you get a message from a node + it will update it in the table if it's there """ + try: + n = self.findNodes(id)[0] + except IndexError: + return None + else: + tstamp = n.lastSeen + n.updateLastSeen() + return tstamp + + def invalidateNode(self, n): + """ + forget about node n - use when you know that node is invalid + """ + self.replaceStaleNode(n, None) + + def nodeFailed(self, node): + """ call this when a node fails to respond to a message, to invalidate that node """ + try: + n = self.findNodes(node.num)[0] + except IndexError: + return None + else: + if n.msgFailed() >= const.MAX_FAILURES: + self.invalidateNode(n) + class KBucket: - __slots__ = ('min', 'max', 'lastAccessed') - def __init__(self, contents, min, max): - self.l = contents - self.min = min - self.max = max - self.lastAccessed = time.time() - - def touch(self): - self.lastAccessed = time.time() - - def getNodeWithInt(self, num): - if num in self.l: return num - else: raise ValueError - - def __repr__(self): - return "" % (len(self.l), self.min, self.max) - - ## Comparators - # necessary for bisecting list of buckets with a hash expressed as an integer or a distance - # compares integer or node object with the bucket's range - def __lt__(self, a): - if isinstance(a, Node): a = a.num - return self.max <= a - def __le__(self, a): - if isinstance(a, Node): a = a.num - return self.min < a - def __gt__(self, a): - if isinstance(a, Node): a = a.num - return self.min > a - def __ge__(self, a): - if isinstance(a, Node): a = a.num - return self.max >= a - def __eq__(self, a): - if isinstance(a, Node): a = a.num - return self.min <= a and self.max > a - def __ne__(self, a): - if isinstance(a, Node): a = a.num - return self.min >= a or self.max < a + __slots__ = ('min', 'max', 'lastAccessed') + def __init__(self, contents, min, max): + self.l = contents + self.min = min + self.max = max + self.lastAccessed = time.time() + + def touch(self): + self.lastAccessed = time.time() + + def getNodeWithInt(self, num): + if num in self.l: return num + else: raise ValueError + + def __repr__(self): + return "" % (len(self.l), self.min, self.max) + + ## Comparators + # necessary for bisecting list of buckets with a hash expressed as an integer or a distance + # compares integer or node object with the bucket's range + def __lt__(self, a): + if isinstance(a, Node): a = a.num + return self.max <= a + def __le__(self, a): + if isinstance(a, Node): a = a.num + return self.min < a + def __gt__(self, a): + if isinstance(a, Node): a = a.num + return self.min > a + def __ge__(self, a): + if isinstance(a, Node): a = a.num + return self.max >= a + def __eq__(self, a): + if isinstance(a, Node): a = a.num + return self.min <= a and self.max > a + def __ne__(self, a): + if isinstance(a, Node): a = a.num + return self.min >= a or self.max < a ### UNIT TESTS ### @@ -216,13 +216,28 @@ class TestKTable(unittest.TestCase): def setUp(self): self.a = Node().init(hash.newID(), 'localhost', 2002) self.t = KTable(self.a) - print self.t.buckets[0].l - def test_replace_stale_node(self): + def testAddNode(self): self.b = Node().init(hash.newID(), 'localhost', 2003) - self.t.replaceStaleNode(self.a, self.b) - assert len(self.t.buckets[0].l) == 1 - assert self.t.buckets[0].l[0].id == self.b.id + self.t.insertNode(self.b) + self.assertEqual(len(self.t.buckets[0].l), 1) + self.assertEqual(self.t.buckets[0].l[0], self.b) + + def testRemove(self): + self.testAddNode() + self.t.invalidateNode(self.b) + self.assertEqual(len(self.t.buckets[0].l), 0) + + def testFail(self): + self.testAddNode() + for i in range(const.MAX_FAILURES - 1): + self.t.nodeFailed(self.b) + self.assertEqual(len(self.t.buckets[0].l), 1) + self.assertEqual(self.t.buckets[0].l[0], self.b) + + self.t.nodeFailed(self.b) + self.assertEqual(len(self.t.buckets[0].l), 0) + if __name__ == "__main__": unittest.main() diff --git a/node.py b/node.py index 22a4dd9..93e1605 100644 --- a/node.py +++ b/node.py @@ -1,79 +1,78 @@ import hash import time from types import * -from xmlrpclib import Binary class Node: - """encapsulate contact info""" - def __init__(self): - self.fails = 0 - self.lastSeen = 0 - self.id = self.host = self.port = '' - - def init(self, id, host, port): - self.id = id - self.num = hash.intify(id) - self.host = host - self.port = port - self._senderDict = {'id': self.id.encode('base64'), 'port' : self.port, 'host' : self.host} - return self - - def initWithDict(self, dict): - self._senderDict = dict - self.id = dict['id'].decode('base64') - self.num = hash.intify(self.id) - self.port = dict['port'] - self.host = dict['host'] - return self - - def updateLastSeen(self): - self.lastSeen = time.time() - self.fails = 0 - - def msgFailed(self): - self.fails = self.fails + 1 - return self.fails - - def senderDict(self): - return self._senderDict - - def __repr__(self): - return `(self.id, self.host, self.port)` - + """encapsulate contact info""" + def __init__(self): + self.fails = 0 + self.lastSeen = 0 + self.id = self.host = self.port = '' + + def init(self, id, host, port): + self.id = id + self.num = hash.intify(id) + self.host = host + self.port = port + self._senderDict = {'id': self.id, 'port' : self.port, 'host' : self.host} + return self + + def initWithDict(self, dict): + self._senderDict = dict + self.id = dict['id'] + self.num = hash.intify(self.id) + self.port = dict['port'] + self.host = dict['host'] + return self + + def updateLastSeen(self): + self.lastSeen = time.time() + self.fails = 0 + + def msgFailed(self): + self.fails = self.fails + 1 + return self.fails + + def senderDict(self): + return self._senderDict + + def __repr__(self): + return `(self.id, self.host, self.port)` + ## these comparators let us bisect/index a list full of nodes with either a node or an int/long - def __lt__(self, a): - if type(a) == InstanceType: - a = a.num - return self.num < a - def __le__(self, a): - if type(a) == InstanceType: - a = a.num - return self.num <= a - def __gt__(self, a): - if type(a) == InstanceType: - a = a.num - return self.num > a - def __ge__(self, a): - if type(a) == InstanceType: - a = a.num - return self.num >= a - def __eq__(self, a): - if type(a) == InstanceType: - a = a.num - return self.num == a - def __ne__(self, a): - if type(a) == InstanceType: - a = a.num - return self.num != a + def __lt__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num < a + def __le__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num <= a + def __gt__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num > a + def __ge__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num >= a + def __eq__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num == a + def __ne__(self, a): + if type(a) == InstanceType: + a = a.num + return self.num != a import unittest class TestNode(unittest.TestCase): - def setUp(self): - self.node = Node().init(hash.newID(), 'localhost', 2002) - def testUpdateLastSeen(self): - t = self.node.lastSeen - self.node.updateLastSeen() - assert t < self.node.lastSeen - \ No newline at end of file + def setUp(self): + self.node = Node().init(hash.newID(), 'localhost', 2002) + def testUpdateLastSeen(self): + t = self.node.lastSeen + self.node.updateLastSeen() + assert t < self.node.lastSeen + \ No newline at end of file diff --git a/test.py b/test.py index 2dc44dd..0dd3f41 100644 --- a/test.py +++ b/test.py @@ -2,8 +2,9 @@ import unittest import ktable, khashmir import hash, node, knode -import actions, xmlrpcclient +import actions import btemplate +import test_airhook -tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'knode', 'btemplate', 'actions', 'ktable', 'xmlrpcclient']) +tests = unittest.defaultTestLoader.loadTestsFromNames(['hash', 'node', 'knode', 'actions', 'ktable', 'test_airhook']) result = unittest.TextTestRunner().run(tests) diff --git a/test_airhook.py b/test_airhook.py index aabaebc..e491d84 100644 --- a/test_airhook.py +++ b/test_airhook.py @@ -27,28 +27,19 @@ class StreamReceiver(protocol.Protocol): self.buf = "" def dataReceived(self, data): self.buf += data - -class EchoFactory(protocol.Factory): - def buildProtocol(self, addr): - return Echo() -class NoisyFactory(protocol.Factory): - def buildProtocol(self, addr): - return Noisy() -class ReceiverFactory(protocol.Factory): - def buildProtocol(self, addr): - return Receiver() -class StreamReceiverFactory(protocol.Factory): - def buildProtocol(self, addr): - return StreamReceiver() - + def makeEcho(port): - return listenAirhookStream(port, EchoFactory()) + f = protocol.Factory(); f.protocol = Echo + return listenAirhookStream(port, f) def makeNoisy(port): - return listenAirhookStream(port, NoisyFactory()) + f = protocol.Factory(); f.protocol = Noisy + return listenAirhookStream(port, f) def makeReceiver(port): - return listenAirhookStream(port, ReceiverFactory()) + f = protocol.Factory(); f.protocol = Receiver + return listenAirhookStream(port, f) def makeStreamReceiver(port): - return listenAirhookStream(port, StreamReceiverFactory()) + f = protocol.Factory(); f.protocol = StreamReceiver + return listenAirhookStream(port, f) class DummyTransport: def __init__(self): @@ -662,18 +653,15 @@ class EchoReactorStreamBig(unittest.TestCase): self.noisy = 0 self.a = makeStreamReceiver(2028) self.b = makeEcho(2029) - self.ac = self.a.connectionForAddr(('127.0.0.1', 2028)) - self.bc = self.b.connectionForAddr(('127.0.0.1', 2029)) + self.ac = self.a.connectionForAddr(('127.0.0.1', 2029)) def testBig(self): - msg = open('/dev/urandom').read(4096) + msg = open('/dev/urandom').read(256) self.ac.write(msg) reactor.iterate() reactor.iterate() reactor.iterate() reactor.iterate() reactor.iterate() - reactor.iterate() - reactor.iterate() self.assertEqual(self.ac.protocol.buf, msg) \ No newline at end of file diff --git a/test_krpc.py b/test_krpc.py new file mode 100644 index 0000000..5f2c5b7 --- /dev/null +++ b/test_krpc.py @@ -0,0 +1,140 @@ +from unittest import * +from krpc import * +from airhook import * + +import sys + +if __name__ =="__main__": + tests = unittest.defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]]) + result = unittest.TextTestRunner().run(tests) + + +def connectionForAddr(host, port): + return host + +class Receiver(protocol.Factory): + protocol = KRPC + def __init__(self): + self.buf = [] + def krpc_store(self, msg, _krpc_sender): + self.buf += [msg] + def krpc_echo(self, msg, _krpc_sender): + return msg + +class SimpleTest(TestCase): + def setUp(self): + self.noisy = 0 + + self.af = Receiver() + self.bf = Receiver() + self.a = listenAirhookStream(4040, self.af) + self.b = listenAirhookStream(4041, self.bf) + + def testSimpleMessage(self): + self.noisy = 1 + self.a.connectionForAddr(('127.0.0.1', 4041)).protocol.sendRequest('store', {'msg' : "This is a test."}) + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.bf.buf, ["This is a test."]) + +class SimpleTest(TestCase): + def setUp(self): + self.noisy = 0 + + self.af = Receiver() + self.bf = Receiver() + self.a = listenAirhookStream(4050, self.af) + self.b = listenAirhookStream(4051, self.bf) + + def testSimpleMessage(self): + self.noisy = 1 + self.a.connectionForAddr(('127.0.0.1', 4051)).protocol.sendRequest('store', {'msg' : "This is a test."}) + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.bf.buf, ["This is a test."]) + +class EchoTest(TestCase): + def setUp(self): + self.noisy = 0 + self.msg = None + + self.af = Receiver() + self.bf = Receiver() + self.a = listenAirhookStream(4042, self.af) + self.b = listenAirhookStream(4043, self.bf) + + def testEcho(self): + self.noisy = 1 + df = self.a.connectionForAddr(('127.0.0.1', 4043)).protocol.sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is a test.") + + def gotMsg(self, msg): + self.msg = msg + +class MultiEchoTest(TestCase): + def setUp(self): + self.noisy = 0 + self.msg = None + + self.af = Receiver() + self.bf = Receiver() + self.a = listenAirhookStream(4048, self.af) + self.b = listenAirhookStream(4049, self.bf) + + def testMultiEcho(self): + self.noisy = 1 + df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is a test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is a test.") + + df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is another test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is another test.") + + df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is yet another test."}) + df.addCallback(self.gotMsg) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.msg, "This is yet another test.") + + def gotMsg(self, msg): + self.msg = msg + +class UnknownMethErrTest(TestCase): + def setUp(self): + self.noisy = 0 + self.err = None + self.af = Receiver() + self.bf = Receiver() + self.a = listenAirhookStream(4044, self.af) + self.b = listenAirhookStream(4045, self.bf) + + def testUnknownMeth(self): + self.noisy = 1 + df = self.a.connectionForAddr(('127.0.0.1', 4045)).protocol.sendRequest('blahblah', {'msg' : "This is a test."}) + df.addErrback(self.gotErr) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.assertEqual(self.err, KRPC_ERROR_METHOD_UNKNOWN) + + def gotErr(self, err): + self.err = err.value diff --git a/util.py b/util.py index ea1e7f2..633772a 100644 --- a/util.py +++ b/util.py @@ -1,20 +1,20 @@ def bucket_stats(l): - """given a list of khashmir instances, finds min, max, and average number of nodes in tables""" - max = avg = 0 - min = None - def count(buckets): - c = 0 - for bucket in buckets: - c = c + len(bucket.l) - return c - for node in l: - c = count(node.table.buckets) - if min == None: - min = c - elif c < min: - min = c - if c > max: - max = c - avg = avg + c - avg = avg / len(l) - return {'min':min, 'max':max, 'avg':avg} + """given a list of khashmir instances, finds min, max, and average number of nodes in tables""" + max = avg = 0 + min = None + def count(buckets): + c = 0 + for bucket in buckets: + c = c + len(bucket.l) + return c + for node in l: + c = count(node.table.buckets) + if min == None: + min = c + elif c < min: + min = c + if c > max: + max = c + avg = avg + c + avg = avg / len(l) + return {'min':min, 'max':max, 'avg':avg} diff --git a/xmlrpcclient.py b/xmlrpcclient.py deleted file mode 100644 index f8d33b2..0000000 --- a/xmlrpcclient.py +++ /dev/null @@ -1,48 +0,0 @@ -from twisted.internet.protocol import ClientFactory -from twisted.protocols.http import HTTPClient -from twisted.internet.defer import Deferred - -from xmlrpclib import loads, dumps -import socket - -USER_AGENT = 'Python/Twisted XMLRPC 0.1' -class XMLRPCClient(HTTPClient): - def connectionMade(self): - payload = dumps(self.args, self.method) - self.sendCommand('POST', '/RPC2') - self.sendHeader('User-Agent', USER_AGENT) - self.sendHeader('Content-Type', 'text/xml') - self.sendHeader('Content-Length', len(payload)) - self.endHeaders() - self.transport.write(payload) - self.transport.write('\r\n') - - def handleResponse(self, buf): - try: - args, name = loads(buf) - except Exception, e: - print "response decode error: " + `e` - self.d.errback() - else: - apply(self.d.callback, args) - -class XMLRPCClientFactory(ClientFactory): - def __init__(self, method, args, callback=None, errback=None): - self.method = method - self.args = args - self.d = Deferred() - if callback: - self.d.addCallback(callback) - if errback: - self.d.addErrback(errback) - self.noisy = 0 - - def buildProtocol(self, addr): - prot = XMLRPCClient() - prot.method = self.method - prot.args = self.args - prot.d = self.d - return prot - - def clientConnectionFailed(self, connector, reason): - self.d.errback() \ No newline at end of file