From: burris Date: Mon, 14 Jun 2004 00:21:57 +0000 (+0000) Subject: major cleanup, updated for twisted X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;ds=sidebyside;h=9048402f56c24474c79920ab849748223ed339cf;p=quix0rs-apt-p2p.git major cleanup, updated for twisted not using airhook, it's buggy now KRPC over single UDP packets values can't be too big, around 1400 bytes khashmir test code is now more sane though it doesn't do much more than build a couple of tables and fetch some values --- diff --git a/.cvsignore b/.cvsignore new file mode 100644 index 0000000..0205d62 --- /dev/null +++ b/.cvsignore @@ -0,0 +1,2 @@ +*.pyc +.DS_Store diff --git a/actions.py b/actions.py index 75b0157..9e77196 100644 --- a/actions.py +++ b/actions.py @@ -6,7 +6,7 @@ from time import time from const import reactor import const -from hash import intify +from khash import intify from knode import KNode as Node from ktable import KTable, K @@ -49,7 +49,7 @@ class FindNode(ActionBase): sender = dict["sender"] sender['port'] = _krpc_sender[1] sender = Node().initWithDict(sender) - sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port)) + sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short @@ -58,7 +58,7 @@ class FindNode(ActionBase): self.answered[sender.id] = 1 for node in l: n = Node().initWithDict(node) - n.conn = self.table.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.table.udp.connectionForAddr((n.host, n.port)) if not self.found.has_key(n.id): self.found[n.id] = n self.schedule() @@ -120,7 +120,7 @@ class GetValue(FindNode): sender = dict["sender"] sender['port'] = _krpc_sender[1] sender = Node().initWithDict(sender) - sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port)) + sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short @@ -132,7 +132,7 @@ class GetValue(FindNode): if dict.has_key('nodes'): for node in dict['nodes']: n = Node().initWithDict(node) - n.conn = self.table.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.table.udp.connectionForAddr((n.host, n.port)) if not self.found.has_key(n.id): self.found[n.id] = n elif dict.has_key('values'): diff --git a/const.py b/const.py index b91996a..1a42485 100644 --- a/const.py +++ b/const.py @@ -1,6 +1,7 @@ ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information +""" from twisted.internet.default import SelectReactor ## twistedmatrix.com reactor = SelectReactor() @@ -14,6 +15,8 @@ try: reactor.installResolver(twisted.names.client.theResolver) except IOError: print "no resolv.conf!" +""" +from twisted.internet import reactor # magic id to use before we know a peer's id NULL_ID = 20 * '\0' diff --git a/hash.py b/hash.py deleted file mode 100644 index 441c58a..0000000 --- a/hash.py +++ /dev/null @@ -1,103 +0,0 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved -# see LICENSE.txt for license information - -from sha import sha -import whrandom - - -def intify(hstr): - """20 bit hash, big-endian -> long python integer""" - assert len(hstr) == 20 - return long(hstr.encode('hex'), 16) - -def stringify(num): - """long int -> 20-character string""" - str = hex(num)[2:] - if str[-1] == 'L': - str = str[:-1] - if len(str) % 2 != 0: - str = '0' + str - str = str.decode('hex') - return (20 - len(str)) *'\x00' + str - -def distance(a, b): - """distance between two 160-bit hashes expressed as 20-character strings""" - return intify(a) ^ intify(b) - - -def newID(): - """returns a new pseudorandom globally unique ID string""" - h = sha() - for i in range(20): - h.update(chr(whrandom.randint(0,255))) - return h.digest() - -def newIDInRange(min, max): - return stringify(randRange(min,max)) - -def randRange(min, max): - return min + intify(newID()) % (max - min) - - -### Test Cases ### -import unittest - -class NewID(unittest.TestCase): - def testLength(self): - self.assertEqual(len(newID()), 20) - def testHundreds(self): - for x in xrange(100): - self.testLength - -class Intify(unittest.TestCase): - known = [('\0' * 20, 0), - ('\xff' * 20, 2L**160 - 1), - ] - def testKnown(self): - for str, value in self.known: - self.assertEqual(intify(str), value) - def testEndianessOnce(self): - h = newID() - while h[-1] == '\xff': - h = newID() - k = h[:-1] + chr(ord(h[-1]) + 1) - self.assertEqual(intify(k) - intify(h), 1) - def testEndianessLots(self): - for x in xrange(100): - self.testEndianessOnce() - -class Disantance(unittest.TestCase): - known = [ - (("\0" * 20, "\xff" * 20), 2**160L -1), - ((sha("foo").digest(), sha("foo").digest()), 0), - ((sha("bar").digest(), sha("bar").digest()), 0) - ] - def testKnown(self): - for pair, dist in self.known: - self.assertEqual(distance(pair[0], pair[1]), dist) - def testCommutitive(self): - for i in xrange(100): - x, y, z = newID(), newID(), newID() - self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z)) - -class RandRange(unittest.TestCase): - def testOnce(self): - a = intify(newID()) - b = intify(newID()) - if a < b: - c = randRange(a, b) - self.assertEqual(a <= c < b, 1, "output out of range %d %d %d" % (b, c, a)) - else: - c = randRange(b, a) - assert b <= c < a, "output out of range %d %d %d" % (b, c, a) - - def testOneHundredTimes(self): - for i in xrange(100): - self.testOnce() - - - -if __name__ == '__main__': - unittest.main() - - diff --git a/khash.py b/khash.py new file mode 100644 index 0000000..deb8d35 --- /dev/null +++ b/khash.py @@ -0,0 +1,113 @@ +## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + +from sha import sha +import whrandom + +#this is ugly, hopefully os.entropy will be in 2.4 +try: + from entropy import entropy +except ImportError: + def entropy(n): + s = '' + for i in range(n): + s += chr(whrandom.randint(0,255)) + return s + +def intify(hstr): + """20 bit hash, big-endian -> long python integer""" + assert len(hstr) == 20 + return long(hstr.encode('hex'), 16) + +def stringify(num): + """long int -> 20-character string""" + str = hex(num)[2:] + if str[-1] == 'L': + str = str[:-1] + if len(str) % 2 != 0: + str = '0' + str + str = str.decode('hex') + return (20 - len(str)) *'\x00' + str + +def distance(a, b): + """distance between two 160-bit hashes expressed as 20-character strings""" + return intify(a) ^ intify(b) + + +def newID(): + """returns a new pseudorandom globally unique ID string""" + h = sha() + h.update(entropy(20)) + return h.digest() + +def newIDInRange(min, max): + return stringify(randRange(min,max)) + +def randRange(min, max): + return min + intify(newID()) % (max - min) + +def newTID(): + return randRange(-2**30, 2**30) + +### Test Cases ### +import unittest + +class NewID(unittest.TestCase): + def testLength(self): + self.assertEqual(len(newID()), 20) + def testHundreds(self): + for x in xrange(100): + self.testLength + +class Intify(unittest.TestCase): + known = [('\0' * 20, 0), + ('\xff' * 20, 2L**160 - 1), + ] + def testKnown(self): + for str, value in self.known: + self.assertEqual(intify(str), value) + def testEndianessOnce(self): + h = newID() + while h[-1] == '\xff': + h = newID() + k = h[:-1] + chr(ord(h[-1]) + 1) + self.assertEqual(intify(k) - intify(h), 1) + def testEndianessLots(self): + for x in xrange(100): + self.testEndianessOnce() + +class Disantance(unittest.TestCase): + known = [ + (("\0" * 20, "\xff" * 20), 2**160L -1), + ((sha("foo").digest(), sha("foo").digest()), 0), + ((sha("bar").digest(), sha("bar").digest()), 0) + ] + def testKnown(self): + for pair, dist in self.known: + self.assertEqual(distance(pair[0], pair[1]), dist) + def testCommutitive(self): + for i in xrange(100): + x, y, z = newID(), newID(), newID() + self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z)) + +class RandRange(unittest.TestCase): + def testOnce(self): + a = intify(newID()) + b = intify(newID()) + if a < b: + c = randRange(a, b) + self.assertEqual(a <= c < b, 1, "output out of range %d %d %d" % (b, c, a)) + else: + c = randRange(b, a) + assert b <= c < a, "output out of range %d %d %d" % (b, c, a) + + def testOneHundredTimes(self): + for i in xrange(100): + self.testOnce() + + + +if __name__ == '__main__': + unittest.main() + + diff --git a/khashmir.py b/khashmir.py index 0c0ed52..e9e53a7 100644 --- a/khashmir.py +++ b/khashmir.py @@ -11,22 +11,20 @@ from sha import sha from ktable import KTable, K from knode import KNode as Node -from hash import newID, newIDInRange +from khash import newID, newIDInRange from actions import FindNode, GetValue, KeyExpirer, StoreValue import krpc -import airhook from twisted.internet.defer import Deferred from twisted.internet import protocol from twisted.python import threadable -from twisted.internet.app import Application +from twisted.application import service, internet from twisted.web import server threadable.init() import sys import sqlite ## find this at http://pysqlite.sourceforge.net/ -import pysqlite_exceptions class KhashmirDBExcept(Exception): pass @@ -34,7 +32,6 @@ class KhashmirDBExcept(Exception): # this is the main class! class Khashmir(protocol.Factory): __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol') - protocol = krpc.KRPC def __init__(self, host, port, db='khashmir.db'): self.setup(host, port, db) @@ -43,19 +40,24 @@ class Khashmir(protocol.Factory): self.port = port self.node = self._loadSelfNode(host, port) self.table = KTable(self.node) - self.app = Application("krpc") - self.airhook = airhook.listenAirhookStream(port, self) + self.app = service.Application("krpc") + self.udp = krpc.hostbroker(self) + self.udp.protocol = krpc.KRPC + self.listenport = reactor.listenUDP(port, self.udp) self.last = time.time() self._loadRoutingTable() KeyExpirer(store=self.store) #self.refreshTable(force=1) reactor.callLater(60, self.checkpoint, (1,)) + def __del__(self): + self.listenport.stopListening() + def _loadSelfNode(self, host, port): c = self.store.cursor() c.execute('select id from self where num = 0;') if c.rowcount > 0: - id = c.fetchone()[0].decode('hex') + id = c.fetchone()[0] else: id = newID() return Node().init(id, host, port) @@ -64,7 +66,7 @@ class Khashmir(protocol.Factory): self.store.autocommit = 0 c = self.store.cursor() c.execute('delete from self where num = 0;') - c.execute("insert into self values (0, '%s');" % self.node.id.encode('hex')) + c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id)) self.store.commit() self.store.autocommit = 1 @@ -95,13 +97,13 @@ class Khashmir(protocol.Factory): self.store = sqlite.connect(db=db) self.store.autocommit = 1 s = """ - create table kv (key text, value text, time timestamp, primary key (key, value)); + create table kv (key binary, value binary, time timestamp, primary key (key, value)); create index kv_key on kv(key); create index kv_timestamp on kv(time); - create table nodes (id text primary key, host text, port number); + create table nodes (id binary primary key, host text, port number); - create table self (num number primary key, id text); + create table self (num number primary key, id binary); """ c = self.store.cursor() c.execute(s) @@ -116,7 +118,7 @@ class Khashmir(protocol.Factory): for bucket in self.table.buckets: for node in bucket.l: d = node.senderDict() - c.execute("insert into nodes values ('%s', '%s', '%s');" % (d['id'].encode('hex'), d['host'], d['port'])) + c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(d['id']), d['host'], d['port'])) self.store.commit() self.store.autocommit = 1; @@ -128,8 +130,8 @@ class Khashmir(protocol.Factory): c = self.store.cursor() c.execute("select * from nodes;") for rec in c.fetchall(): - n = Node().initWithDict({'id':rec[0].decode('hex'), 'host':rec[1], 'port':int(rec[2])}) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) + n.conn = self.udp.connectionForAddr((n.host, n.port)) self.table.insertNode(n, contacted=0) @@ -140,7 +142,7 @@ class Khashmir(protocol.Factory): ping this node and add the contact info to the table on pong! """ n =Node().init(const.NULL_ID, host, port) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.udp.connectionForAddr((n.host, n.port)) self.sendPing(n, callback=callback) ## this call is async! @@ -246,7 +248,7 @@ class Khashmir(protocol.Factory): sender['host'] = node.host sender['port'] = node.port n = Node().initWithDict(sender) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.udp.connectionForAddr((n.host, n.port)) table.insertNode(n) if callback: callback() @@ -280,13 +282,12 @@ class Khashmir(protocol.Factory): def retrieveValues(self, key): - s = "select value from kv where key = '%s';" % key.encode('hex') c = self.store.cursor() - c.execute(s) + c.execute("select value from kv where key = %s;", sqlite.encode(key)) t = c.fetchone() l = [] while t: - l.append(t['value'].decode('base64')) + l.append(t['value']) t = c.fetchone() return l @@ -301,7 +302,7 @@ class Khashmir(protocol.Factory): sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) return {"sender" : self.node.senderDict()} @@ -311,24 +312,22 @@ class Khashmir(protocol.Factory): sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) return {"nodes" : nodes, "sender" : self.node.senderDict()} def krpc_store_value(self, key, value, sender, _krpc_sender): t = "%0.6f" % time.time() - s = "insert into kv values ('%s', '%s', '%s');" % (key.encode("hex"), value.encode("base64"), t) c = self.store.cursor() try: - c.execute(s) - except pysqlite_exceptions.IntegrityError, reason: + c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t)) + except sqlite.IntegrityError, reason: # update last insert time - s = "update kv set time = '%s' where key = '%s' and value = '%s';" % (t, key.encode("hex"), value.encode("base64")) - c.execute(s) + c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) return {"sender" : self.node.senderDict()} @@ -336,7 +335,7 @@ class Khashmir(protocol.Factory): sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] n = Node().initWithDict(sender) - n.conn = self.airhook.connectionForAddr((n.host, n.port)) + n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) l = self.retrieveValues(key) @@ -347,171 +346,3 @@ class Khashmir(protocol.Factory): nodes = map(lambda node: node.senderDict(), nodes) return {'nodes' : nodes, "sender": self.node.senderDict()} -### TESTING ### -from random import randrange -import threading, thread, sys, time -from sha import sha -from hash import newID - - -def test_net(host='127.0.0.1', peers=24, startport=2001, dbprefix='/tmp/test'): - import thread - l = [] - for i in xrange(peers): - a = Khashmir(host, startport + i, db = dbprefix+`i`) - l.append(a) - thread.start_new_thread(l[0].app.run, ()) - for peer in l[1:]: - peer.app.run(installSignalHandlers=0) - return l - -def test_build_net(quiet=0, peers=24, host='127.0.0.1', pause=0, startport=2001, dbprefix='/tmp/test'): - from whrandom import randrange - import threading - import thread - import sys - port = startport - l = [] - if not quiet: - print "Building %s peer table." % peers - - for i in xrange(peers): - a = Khashmir(host, port + i, db = dbprefix +`i`) - l.append(a) - - - thread.start_new_thread(l[0].app.run, ()) - time.sleep(1) - for peer in l[1:]: - peer.app.run(installSignalHandlers=0) - #time.sleep(3) - - def spewer(frame, s, ignored): - from twisted.python import reflect - if frame.f_locals.has_key('self'): - se = frame.f_locals['self'] - print 'method %s of %s at %s' % ( - frame.f_code.co_name, reflect.qual(se.__class__), id(se) - ) - #sys.settrace(spewer) - - print "adding contacts...." - def makecb(flag): - def cb(f=flag): - f.set() - return cb - - for peer in l: - p = l[randrange(0, len(l))] - if p != peer: - n = p.node - flag = threading.Event() - peer.addContact(host, n.port, makecb(flag)) - flag.wait() - p = l[randrange(0, len(l))] - if p != peer: - n = p.node - flag = threading.Event() - peer.addContact(host, n.port, makecb(flag)) - flag.wait() - p = l[randrange(0, len(l))] - if p != peer: - n = p.node - flag = threading.Event() - peer.addContact(host, n.port, makecb(flag)) - flag.wait() - - print "finding close nodes...." - - for peer in l: - flag = threading.Event() - def cb(nodes, f=flag): - f.set() - peer.findCloseNodes(cb) - flag.wait() - # for peer in l: - # peer.refreshTable() - return l - -def test_find_nodes(l, quiet=0): - flag = threading.Event() - - n = len(l) - - a = l[randrange(0,n)] - b = l[randrange(0,n)] - - def callback(nodes, flag=flag, id = b.node.id): - if (len(nodes) >0) and (nodes[0].id == id): - print "test_find_nodes PASSED" - else: - print "test_find_nodes FAILED" - flag.set() - a.findNode(b.node.id, callback) - flag.wait() - -def test_find_value(l, quiet=0): - ff = threading.Event() - fa = threading.Event() - fb = threading.Event() - fc = threading.Event() - - n = len(l) - a = l[randrange(0,n)] - b = l[randrange(0,n)] - c = l[randrange(0,n)] - d = l[randrange(0,n)] - - key = newID() - value = newID() - if not quiet: print "inserting value..." - def acb(p, f=ff): - f.set() - a.storeValueForKey(key, value, acb) - ff.wait() - - if not quiet: - print "finding..." - - class cb: - def __init__(self, flag, value=value, port=None): - self.flag = flag - self.val = value - self.found = 0 - self.port = port - def callback(self, values): - if(len(values) == 0): - if not self.found: - print "find %s NOT FOUND" % self.port - else: - print "find %s FOUND" % self.port - self.flag.set() - else: - if self.val in values: - self.found = 1 - - b.valueForKey(key, cb(fa, port=b.port).callback, searchlocal=0) - fa.wait() - c.valueForKey(key, cb(fb, port=c.port).callback, searchlocal=0) - fb.wait() - d.valueForKey(key, cb(fc, port=d.port).callback, searchlocal=0) - fc.wait() - -def test_one(host, port, db='/tmp/test'): - import thread - k = Khashmir(host, port, db) - thread.start_new_thread(reactor.run, ()) - return k - -if __name__ == "__main__": - import sys - n = 8 - if len(sys.argv) > 1: n = int(sys.argv[1]) - l = test_build_net(peers=n) - time.sleep(3) - print "finding nodes..." - for i in range(n): - test_find_nodes(l) - print "inserting and fetching values..." - for i in range(10): - test_find_value(l) diff --git a/knode.py b/knode.py index 3d9c4f1..70069fa 100644 --- a/knode.py +++ b/knode.py @@ -24,18 +24,18 @@ class KNode(Node): return dict def ping(self, sender): - df = self.conn.protocol.sendRequest('ping', {"sender":sender}) + df = self.conn.sendRequest('ping', {"sender":sender}) df.addCallback(self.checkSender) return df def findNode(self, target, sender): - df = self.conn.protocol.sendRequest('find_node', {"target" : target, "sender": sender}) + df = self.conn.sendRequest('find_node', {"target" : target, "sender": sender}) df.addCallback(self.checkSender) return df def storeValue(self, key, value, sender): - df = self.conn.protocol.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender}) + df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "sender": sender}) df.addCallback(self.checkSender) return df def findValue(self, key, sender): - df = self.conn.protocol.sendRequest('find_value', {"key" : key, "sender" : sender}) + df = self.conn.sendRequest('find_value', {"key" : key, "sender" : sender}) df.addCallback(self.checkSender) return df diff --git a/krpc.py b/krpc.py index 5f27769..24a0f16 100644 --- a/krpc.py +++ b/krpc.py @@ -5,10 +5,12 @@ import airhook from twisted.internet.defer import Deferred from twisted.protocols import basic from bencode import bencode, bdecode +from twisted.internet import protocol + from twisted.internet import reactor import time -import hash +import khash as hash KRPC_TIMEOUT = 60 @@ -17,110 +19,143 @@ KRPC_ERROR_METHOD_UNKNOWN = 2 KRPC_ERROR_RECEIVED_UNKNOWN = 3 KRPC_ERROR_TIMEOUT = 4 -class KRPC(basic.NetstringReceiver): - noisy = 1 - def __init__(self): - self.tids = {} +# commands +TID = 'tid' +REQ = 'req' +RSP = 'rsp' +TYP = 'typ' +ARG = 'arg' +ERR = 'err' + +class hostbroker(protocol.DatagramProtocol): + def __init__(self, server): + self.noisy = 0 + self.server = server + # this should be changed to storage that drops old entries + self.connections = {} + + def datagramReceived(self, datagram, addr): + #print `addr`, `datagram` + #if addr != self.addr: + c = self.connectionForAddr(addr) + c.datagramReceived(datagram) + #if c.idle(): + # del self.connections[addr] + def connectionForAddr(self, addr): + if addr == self.addr: + raise Exception + if not self.connections.has_key(addr): + conn = self.protocol(addr, self.server, self.transport) + self.connections[addr] = conn + else: + conn = self.connections[addr] + return conn + + def makeConnection(self, transport): + protocol.DatagramProtocol.makeConnection(self, transport) + tup = transport.getHost() + self.addr = (tup.host, tup.port) - def dataRecieved(self, data): - basic.NetstringReceiver(self, data) - if self.brokenPeer: - self.resetConnection() - - def resetConnection(self): - self.brokenPeer = 0 - self._readerState = basic.LENGTH - self._readerLength = 0 +## connection +class KRPC: + noisy = 1 + def __init__(self, addr, server, transport): + self.transport = transport + self.factory = server + self.addr = addr + self.tids = {} - def stringReceived(self, str): + def datagramReceived(self, str): # bdecode try: msg = bdecode(str) except Exception, e: - if self.naisy: + if self.noisy: print "response decode error: " + `e` self.d.errback() else: + #if self.noisy: + # print msg # look at msg type - if msg['typ'] == 'req': + if msg[TYP] == REQ: ilen = len(str) # if request # tell factory to handle - f = getattr(self.factory ,"krpc_" + msg['req'], None) + f = getattr(self.factory ,"krpc_" + msg[REQ], None) if f and callable(f): - msg['arg']['_krpc_sender'] = self.transport.addr + msg[ARG]['_krpc_sender'] = self.addr try: - ret = apply(f, (), msg['arg']) + ret = apply(f, (), msg[ARG]) except Exception, e: ## send error - out = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`}) + out = bencode({TID:msg[TID], TYP:ERR, ERR :`e`}) olen = len(out) - self.sendString(out) + self.transport.write(out, self.addr) else: if ret: # make response - out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret}) + out = bencode({TID : msg[TID], TYP : RSP, RSP : ret}) else: - out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}}) + out = bencode({TID : msg[TID], TYP : RSP, RSP : {}}) # send response olen = len(out) - self.sendString(out) + self.transport.write(out, self.addr) else: if self.noisy: - print "don't know about method %s" % msg['req'] + print "don't know about method %s" % msg[REQ] # unknown method - out = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN}) + out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN}) olen = len(out) - self.sendString(out) + self.transport.write(out, self.addr) if self.noisy: - print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.transport.addr, self.factory.node.port, - ilen, msg['req'], olen) - elif msg['typ'] == 'rsp': + print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.addr, self.factory.node.port, + ilen, msg[REQ], olen) + elif msg[TYP] == RSP: # if response # lookup tid - if self.tids.has_key(msg['tid']): - df = self.tids[msg['tid']] + if self.tids.has_key(msg[TID]): + df = self.tids[msg[TID]] # callback - del(self.tids[msg['tid']]) - df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr}) + del(self.tids[msg[TID]]) + df.callback({RSP : msg[RSP], '_krpc_sender': self.addr}) else: - print 'timeout ' + `msg['rsp']['sender']` + print 'timeout ' + `msg[RSP]['sender']` # no tid, this transaction timed out already... - elif msg['typ'] == 'err': + elif msg[TYP] == ERR: # if error # lookup tid - if self.tids.has_key(msg['tid']): - df = self.tids[msg['tid']] + if self.tids.has_key(msg[TID]): + df = self.tids[msg[TID]] # callback - df.errback(msg['err']) - del(self.tids[msg['tid']]) + df.errback(msg[ERR]) + del(self.tids[msg[TID]]) else: # day late and dollar short pass else: print "unknown message type " + `msg` # unknown message type - df = self.tids[msg['tid']] + df = self.tids[msg[TID]] # callback df.errback(KRPC_ERROR_RECEIVED_UNKNOWN) - del(self.tids[msg['tid']]) + del(self.tids[msg[TID]]) def sendRequest(self, method, args): # make message # send it - msg = {'tid' : hash.newID(), 'typ' : 'req', 'req' : method, 'arg' : args} + msg = {TID : hash.newTID(), TYP : REQ, REQ : method, ARG : args} str = bencode(msg) d = Deferred() - self.tids[msg['tid']] = d - def timeOut(tids = self.tids, id = msg['tid']): + self.tids[msg[TID]] = d + def timeOut(tids = self.tids, id = msg[TID]): if tids.has_key(id): df = tids[id] del(tids[id]) print ">>>>>> KRPC_ERROR_TIMEOUT" df.errback(KRPC_ERROR_TIMEOUT) reactor.callLater(KRPC_TIMEOUT, timeOut) - self.sendString(str) + self.transport.write(str, self.addr) return d diff --git a/ktable.py b/ktable.py index 4d4c5bb..67b23c9 100644 --- a/ktable.py +++ b/ktable.py @@ -5,7 +5,7 @@ import time from bisect import * from types import * -import hash +import khash as hash import const from const import K, HASH_LENGTH, NULL_ID from node import Node diff --git a/node.py b/node.py index 3d91a89..f8d2801 100644 --- a/node.py +++ b/node.py @@ -1,7 +1,7 @@ ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information -import hash +import khash import time from types import * @@ -14,7 +14,7 @@ class Node: def init(self, id, host, port): self.id = id - self.num = hash.intify(id) + self.num = khash.intify(id) self.host = host self.port = port self._senderDict = {'id': self.id, 'port' : self.port, 'host' : self.host} @@ -23,7 +23,7 @@ class Node: def initWithDict(self, dict): self._senderDict = dict self.id = dict['id'] - self.num = hash.intify(self.id) + self.num = khash.intify(self.id) self.port = dict['port'] self.host = dict['host'] return self @@ -73,7 +73,7 @@ import unittest class TestNode(unittest.TestCase): def setUp(self): - self.node = Node().init(hash.newID(), 'localhost', 2002) + self.node = Node().init(khash.newID(), 'localhost', 2002) def testUpdateLastSeen(self): t = self.node.lastSeen self.node.updateLastSeen() diff --git a/test_khashmir.py b/test_khashmir.py new file mode 100644 index 0000000..7dcc247 --- /dev/null +++ b/test_khashmir.py @@ -0,0 +1,137 @@ +from unittest import * +from khashmir import * +import khash + +from whrandom import randrange + +import os + +if __name__ =="__main__": + tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]]) + result = TextTestRunner().run(tests) + +class SimpleTests(TestCase): + def setUp(self): + self.a = Khashmir('127.0.0.1', 4044, '/tmp/a.test') + self.b = Khashmir('127.0.0.1', 4045, '/tmp/b.test') + + def tearDown(self): + self.a.listenport.stopListening() + self.b.listenport.stopListening() + os.unlink('/tmp/a.test') + os.unlink('/tmp/b.test') + reactor.iterate() + reactor.iterate() + + def addContacts(self): + self.a.addContact('127.0.0.1', 4045) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + + def testAddContact(self): + self.assertEqual(len(self.a.table.buckets), 1) + self.assertEqual(len(self.a.table.buckets[0].l), 0) + + self.assertEqual(len(self.b.table.buckets), 1) + self.assertEqual(len(self.b.table.buckets[0].l), 0) + + self.addContacts() + + self.assertEqual(len(self.a.table.buckets), 1) + self.assertEqual(len(self.a.table.buckets[0].l), 1) + self.assertEqual(len(self.b.table.buckets), 1) + self.assertEqual(len(self.b.table.buckets[0].l), 1) + + def testStoreRetrieve(self): + self.addContacts() + self.got = 0 + self.a.storeValueForKey(sha('foo').digest(), 'foobar') + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.a.valueForKey(sha('foo').digest(), self._cb) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + + def _cb(self, val): + if not val: + self.assertEqual(self.got, 1) + elif 'foobar' in val: + self.got = 1 + + +class MultiTest(TestCase): + num = 30 + def _done(self, val): + self.done = 1 + + def setUp(self): + self.l = [] + self.startport = 4044 + for i in range(self.num): + self.l.append(Khashmir('127.0.0.1', self.startport + i, '/tmp/%s.test' % (self.startport + i))) + reactor.iterate() + reactor.iterate() + + for i in self.l: + i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) + i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) + i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) + reactor.iterate() + reactor.iterate() + reactor.iterate() + + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + reactor.iterate() + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + reactor.iterate() + + def tearDown(self): + for i in self.l: + i.listenport.stopListening() + for i in range(self.startport, self.startport+self.num): + os.unlink('/tmp/%s.test' % i) + + reactor.iterate() + + def testStoreRetrieve(self): + for i in range(10): + K = khash.newID() + V = khash.newID() + + self.done = 0 + def _cb(val): + self.done = 1 + self.l[randrange(0, self.num)].storeValueForKey(K, V, _cb) + while not self.done: + reactor.iterate() + + self.got = 0 + self.done = 0 + + def _cb(val): + if not val: + self.done = 1 + self.assertEqual(self.got, 1) + elif V in val: + self.got = 1 + + self.l[randrange(0, self.num)].valueForKey(K, _cb) + while not self.done: + reactor.iterate() diff --git a/test_krpc.py b/test_krpc.py index d7f9754..a7bef9d 100644 --- a/test_krpc.py +++ b/test_krpc.py @@ -3,20 +3,21 @@ from unittest import * from krpc import * -from airhook import * +#from airhook import * KRPC.noisy = 0 import sys if __name__ =="__main__": - tests = unittest.defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]]) - result = unittest.TextTestRunner().run(tests) + tests = defaultTestLoader.loadTestsFromNames([sys.argv[0][:-3]]) + result = TextTestRunner().run(tests) def connectionForAddr(host, port): return host + class Receiver(protocol.Factory): protocol = KRPC def __init__(self): @@ -26,51 +27,35 @@ class Receiver(protocol.Factory): def krpc_echo(self, msg, _krpc_sender): return msg -class SimpleTest(TestCase): +def make(port): + af = Receiver() + a = hostbroker(af) + a.protocol = KRPC + p = reactor.listenUDP(port, a) + return af, a, p + +class KRPCTests(TestCase): def setUp(self): self.noisy = 0 - - self.af = Receiver() - self.bf = Receiver() - self.a = listenAirhookStream(4040, self.af) - self.b = listenAirhookStream(4041, self.bf) - - def testSimpleMessage(self): - self.noisy = 0 - self.a.connectionForAddr(('127.0.0.1', 4041)).protocol.sendRequest('store', {'msg' : "This is a test."}) - reactor.iterate() + self.af, self.a, self.ap = make(1180) + self.bf, self.b, self.bp = make(1181) + + def tearDown(self): + self.ap.stopListening() + self.bp.stopListening() reactor.iterate() reactor.iterate() - self.assertEqual(self.bf.buf, ["This is a test."]) -class SimpleTest(TestCase): - def setUp(self): - self.noisy = 0 - - self.af = Receiver() - self.bf = Receiver() - self.a = listenAirhookStream(4050, self.af) - self.b = listenAirhookStream(4051, self.bf) - def testSimpleMessage(self): self.noisy = 0 - self.a.connectionForAddr(('127.0.0.1', 4051)).protocol.sendRequest('store', {'msg' : "This is a test."}) + self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) reactor.iterate() reactor.iterate() reactor.iterate() self.assertEqual(self.bf.buf, ["This is a test."]) -class BlastTest(TestCase): - def setUp(self): - self.noisy = 0 - - self.af = Receiver() - self.bf = Receiver() - self.a = listenAirhookStream(4060, self.af) - self.b = listenAirhookStream(4061, self.bf) - def testMessageBlast(self): - self.a.connectionForAddr(('127.0.0.1', 4061)).protocol.sendRequest('store', {'msg' : "This is a test."}) + self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) reactor.iterate() reactor.iterate() reactor.iterate() @@ -78,23 +63,13 @@ class BlastTest(TestCase): self.bf.buf = [] for i in range(100): - self.a.connectionForAddr(('127.0.0.1', 4061)).protocol.sendRequest('store', {'msg' : "This is a test."}) + self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) reactor.iterate() #self.bf.buf = [] self.assertEqual(self.bf.buf, ["This is a test."] * 100) -class EchoTest(TestCase): - def setUp(self): - self.noisy = 0 - self.msg = None - - self.af = Receiver() - self.bf = Receiver() - self.a = listenAirhookStream(4042, self.af) - self.b = listenAirhookStream(4043, self.bf) - def testEcho(self): - df = self.a.connectionForAddr(('127.0.0.1', 4043)).protocol.sendRequest('echo', {'msg' : "This is a test."}) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) df.addCallback(self.gotMsg) reactor.iterate() reactor.iterate() @@ -104,21 +79,11 @@ class EchoTest(TestCase): def gotMsg(self, dict): _krpc_sender = dict['_krpc_sender'] - msg = dict['rsp'] + msg = dict[RSP] self.msg = msg -class ManyEchoTest(TestCase): - def setUp(self): - self.noisy = 0 - self.msg = None - - self.af = Receiver() - self.bf = Receiver() - self.a = listenAirhookStream(4588, self.af) - self.b = listenAirhookStream(4589, self.bf) - def testManyEcho(self): - df = self.a.connectionForAddr(('127.0.0.1', 4589)).protocol.sendRequest('echo', {'msg' : "This is a test."}) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) df.addCallback(self.gotMsg) reactor.iterate() reactor.iterate() @@ -127,32 +92,17 @@ class ManyEchoTest(TestCase): self.assertEqual(self.msg, "This is a test.") for i in xrange(100): self.msg = None - df = self.a.connectionForAddr(('127.0.0.1', 4589)).protocol.sendRequest('echo', {'msg' : "This is a test."}) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) df.addCallback(self.gotMsg) reactor.iterate() reactor.iterate() reactor.iterate() reactor.iterate() self.assertEqual(self.msg, "This is a test.") - - def gotMsg(self, dict): - _krpc_sender = dict['_krpc_sender'] - msg = dict['rsp'] - self.msg = msg -class MultiEchoTest(TestCase): - def setUp(self): - self.noisy = 0 - self.msg = None - - self.af = Receiver() - self.bf = Receiver() - self.a = listenAirhookStream(4048, self.af) - self.b = listenAirhookStream(4049, self.bf) - def testMultiEcho(self): self.noisy = 1 - df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is a test."}) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) df.addCallback(self.gotMsg) reactor.iterate() reactor.iterate() @@ -160,7 +110,7 @@ class MultiEchoTest(TestCase): reactor.iterate() self.assertEqual(self.msg, "This is a test.") - df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is another test."}) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."}) df.addCallback(self.gotMsg) reactor.iterate() reactor.iterate() @@ -168,7 +118,7 @@ class MultiEchoTest(TestCase): reactor.iterate() self.assertEqual(self.msg, "This is another test.") - df = self.a.connectionForAddr(('127.0.0.1', 4049)).protocol.sendRequest('echo', {'msg' : "This is yet another test."}) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."}) df.addCallback(self.gotMsg) reactor.iterate() reactor.iterate() @@ -176,24 +126,9 @@ class MultiEchoTest(TestCase): reactor.iterate() self.assertEqual(self.msg, "This is yet another test.") - def gotMsg(self, dict): - _krpc_sender = dict['_krpc_sender'] - msg = dict['rsp'] - self.msg = msg - -class EchoResetTest(TestCase): - def setUp(self): - self.noisy = 0 - self.msg = None - - self.af = Receiver() - self.bf = Receiver() - self.a = listenAirhookStream(4078, self.af) - self.b = listenAirhookStream(4079, self.bf) - def testEchoReset(self): self.noisy = 1 - df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is a test."}) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."}) df.addCallback(self.gotMsg) reactor.iterate() reactor.iterate() @@ -201,7 +136,7 @@ class EchoResetTest(TestCase): reactor.iterate() self.assertEqual(self.msg, "This is a test.") - df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is another test."}) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."}) df.addCallback(self.gotMsg) reactor.iterate() reactor.iterate() @@ -209,8 +144,8 @@ class EchoResetTest(TestCase): reactor.iterate() self.assertEqual(self.msg, "This is another test.") - del(self.a.connections[('127.0.0.1', 4079)]) - df = self.a.connectionForAddr(('127.0.0.1', 4079)).protocol.sendRequest('echo', {'msg' : "This is yet another test."}) + del(self.a.connections[('127.0.0.1', 1181)]) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."}) df.addCallback(self.gotMsg) reactor.iterate() reactor.iterate() @@ -221,23 +156,10 @@ class EchoResetTest(TestCase): def testLotsofEchoReset(self): for i in range(100): self.testEchoReset() - def gotMsg(self, dict): - _krpc_sender = dict['_krpc_sender'] - msg = dict['rsp'] - self.msg = msg - -class UnknownMethErrTest(TestCase): - def setUp(self): - self.noisy = 0 - self.err = None - self.af = Receiver() - self.bf = Receiver() - self.a = listenAirhookStream(4044, self.af) - self.b = listenAirhookStream(4045, self.bf) - + def testUnknownMeth(self): self.noisy = 1 - df = self.a.connectionForAddr(('127.0.0.1', 4045)).protocol.sendRequest('blahblah', {'msg' : "This is a test."}) + df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."}) df.addErrback(self.gotErr) reactor.iterate() reactor.iterate() @@ -247,4 +169,4 @@ class UnknownMethErrTest(TestCase): def gotErr(self, err): self.err = err.value - + \ No newline at end of file