From e6beef78406c3093fc4eb98c4293b9257b44aa61 Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Thu, 10 Jan 2008 12:45:04 -0800 Subject: [PATCH] Use function for sending krpc responses, and add spew parameter. Sending a response in the krpc module now calls a function. There is a new 'SPEW' config variable that causes the krpc module to spew out all sent and received packet data (previously called noisy). Also changed all the tests to use failUnlessEqual instead of assertEqual, and all assert() commands no longer use brackets. Updated some of the downloading tests for the new file sizes on camrdale.org (need a better solution here). --- apt-dht.py | 2 +- apt_dht/HTTPDownloader.py | 14 ++++----- apt_dht/Hash.py | 2 +- apt_dht/PeerManager.py | 6 ++-- apt_dht/apt_dht_conf.py | 3 ++ apt_dht_Khashmir/DHT.py | 6 ++-- apt_dht_Khashmir/actions.py | 4 +-- apt_dht_Khashmir/khash.py | 14 ++++----- apt_dht_Khashmir/khashmir.py | 26 ++++++++-------- apt_dht_Khashmir/krpc.py | 58 ++++++++++++++++-------------------- apt_dht_Khashmir/ktable.py | 12 ++++---- apt_dht_Khashmir/node.py | 6 ++-- 12 files changed, 76 insertions(+), 77 deletions(-) diff --git a/apt-dht.py b/apt-dht.py index a532527..3cc5d94 100644 --- a/apt-dht.py +++ b/apt-dht.py @@ -57,7 +57,7 @@ application = service.Application("apt-dht", uid, gid) log.msg('Starting DHT') DHT = __import__(config.get('DEFAULT', 'DHT')+'.DHT', globals(), locals(), ['DHT']) -assert(IDHT.implementedBy(DHT.DHT), "You must provide a DHT implementation that implements the IDHT interface.") +assert IDHT.implementedBy(DHT.DHT), "You must provide a DHT implementation that implements the IDHT interface." myDHT = DHT.DHT() myDHT.loadConfig(config, config.get('DEFAULT', 'DHT')) myDHT.join() diff --git a/apt_dht/HTTPDownloader.py b/apt_dht/HTTPDownloader.py index 91ce7c1..c906c0e 100644 --- a/apt_dht/HTTPDownloader.py +++ b/apt_dht/HTTPDownloader.py @@ -36,7 +36,7 @@ class HTTPClientManager(ClientFactory): self.connector = None def connect(self): - assert(self.closed and not self.connecting) + assert self.closed and not self.connecting self.connecting = True d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port) d.addCallback(self.connected) @@ -176,15 +176,15 @@ class TestClientManager(unittest.TestCase): d.addBoth(lastDefer.callback) newRequest("/", 1, 3433) - newRequest("/blog/", 2, 37121) + newRequest("/blog/", 2, 39152) newRequest("/camrdale.html", 3, 2234) self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309)) self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084)) - self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4750)) + self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4756)) self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783)) self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525)) self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381)) - self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 302362, True)) + self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 313470, True)) return lastDefer def test_multiple_quick_downloads(self): @@ -200,15 +200,15 @@ class TestClientManager(unittest.TestCase): d.addBoth(lastDefer.callback) newRequest("/", 1, 3433) - newRequest("/blog/", 2, 37121) + newRequest("/blog/", 2, 39152) newRequest("/camrdale.html", 3, 2234) self.pending_calls.append(reactor.callLater(0, newRequest, '/robots.txt', 4, 309)) self.pending_calls.append(reactor.callLater(0, newRequest, '/wikilink.html', 5, 3084)) - self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4750)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4756)) self.pending_calls.append(reactor.callLater(0, newRequest, '/PlanetLab.html', 7, 2783)) self.pending_calls.append(reactor.callLater(0, newRequest, '/openid.html', 8, 2525)) self.pending_calls.append(reactor.callLater(0, newRequest, '/subpage.html', 9, 2381)) - self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 302362, True)) + self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 313470, True)) return lastDefer def test_range(self): diff --git a/apt_dht/Hash.py b/apt_dht/Hash.py index 3149f58..bb993f1 100644 --- a/apt_dht/Hash.py +++ b/apt_dht/Hash.py @@ -53,7 +53,7 @@ class HashObject: if bits is not None: bytes = (bits - 1) // 8 + 1 else: - assert(bytes is not None) + assert bytes is not None, "you must specify one of bits or bytes" if len(hashString) < bytes: hashString = hashString + '\000'*(bytes - len(hashString)) elif len(hashString) > bytes: diff --git a/apt_dht/PeerManager.py b/apt_dht/PeerManager.py index 25bd4f5..75c135d 100644 --- a/apt_dht/PeerManager.py +++ b/apt_dht/PeerManager.py @@ -23,7 +23,7 @@ class PeerManager: url = choice(locations) log.msg('Downloading %s' % url) parsed = urlparse(url) - assert(parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]) + assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0] host, port = splitHostPort(parsed[0], parsed[1]) path = urlunparse(('', '') + parsed[2:]) @@ -88,11 +88,11 @@ class TestPeerManager(unittest.TestCase): d.addBoth(lastDefer.callback) newRequest('www.camrdale.org', "/", 1, 3433) - newRequest('www.camrdale.org', "/blog/", 2, 37121) + newRequest('www.camrdale.org', "/blog/", 2, 39152) newRequest('www.google.ca', "/", 3, None) self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None)) self.pending_calls.append(reactor.callLater(10, newRequest, 'www.camrdale.org', '/wikilink.html', 5, 3084)) - self.pending_calls.append(reactor.callLater(30, newRequest, 'www.camrdale.org', '/sitemap.html', 6, 4750)) + self.pending_calls.append(reactor.callLater(30, newRequest, 'www.camrdale.org', '/sitemap.html', 6, 4756)) self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None)) self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/openid.html', 8, 2525)) self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/subpage.html', 9, 2381)) diff --git a/apt_dht/apt_dht_conf.py b/apt_dht/apt_dht_conf.py index dd474bf..98bbd88 100644 --- a/apt_dht/apt_dht_conf.py +++ b/apt_dht/apt_dht_conf.py @@ -80,6 +80,9 @@ DHT_DEFAULTS = { # expire entries older than this 'KE_AGE': '1h', # 60 minutes + + # whether to spew info about the requests/responses in the protocol + 'SPEW': 'yes', } class AptDHTConfigParser(SafeConfigParser): diff --git a/apt_dht_Khashmir/DHT.py b/apt_dht_Khashmir/DHT.py index 590a77c..f77962e 100644 --- a/apt_dht_Khashmir/DHT.py +++ b/apt_dht_Khashmir/DHT.py @@ -42,6 +42,8 @@ class DHT: elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL', 'BUCKET_STALENESS', 'KEINITIAL_DELAY', 'KE_DELAY', 'KE_AGE']: self.config[k] = self.config_parser.gettime(section, k) + elif k in ['SPEW']: + self.config[k] = self.config_parser.getboolean(section, k) else: self.config[k] = self.config_parser.get(section, k) @@ -158,7 +160,7 @@ class TestSimpleDHT(unittest.TestCase): 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, - 'KE_AGE': 3600, } + 'KE_AGE': 3600, 'SPEW': False, } def setUp(self): self.a = DHT() @@ -251,7 +253,7 @@ class TestMultiDHT(unittest.TestCase): 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, - 'KE_AGE': 3600, } + 'KE_AGE': 3600, 'SPEW': False, } def setUp(self): self.l = [] diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index a99b7ea..9bfa2e8 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -75,7 +75,7 @@ class FindNode(ActionBase): self.queried[node.id] = 1 if self.outstanding >= self.config['CONCURRENT_REQS']: break - assert(self.outstanding) >=0 + assert self.outstanding >=0 if self.outstanding == 0: ## all done!! self.finished=1 @@ -162,7 +162,7 @@ class GetValue(FindNode): self.queried[node.id] = 1 if self.outstanding >= self.config['CONCURRENT_REQS']: break - assert(self.outstanding) >=0 + assert self.outstanding >=0 if self.outstanding == 0: ## all done, didn't find it!! self.finished=1 diff --git a/apt_dht_Khashmir/khash.py b/apt_dht_Khashmir/khash.py index 8db84d8..0f0d8e3 100644 --- a/apt_dht_Khashmir/khash.py +++ b/apt_dht_Khashmir/khash.py @@ -43,7 +43,7 @@ def newTID(): class TestNewID(unittest.TestCase): def testLength(self): - self.assertEqual(len(newID()), 20) + self.failUnlessEqual(len(newID()), 20) def testHundreds(self): for x in xrange(100): self.testLength @@ -54,13 +54,13 @@ class TestIntify(unittest.TestCase): ] def testKnown(self): for str, value in self.known: - self.assertEqual(intify(str), value) + self.failUnlessEqual(intify(str), value) def testEndianessOnce(self): h = newID() while h[-1] == '\xff': h = newID() k = h[:-1] + chr(ord(h[-1]) + 1) - self.assertEqual(intify(k) - intify(h), 1) + self.failUnlessEqual(intify(k) - intify(h), 1) def testEndianessLots(self): for x in xrange(100): self.testEndianessOnce() @@ -73,11 +73,11 @@ class TestDisantance(unittest.TestCase): ] def testKnown(self): for pair, dist in self.known: - self.assertEqual(distance(pair[0], pair[1]), dist) + self.failUnlessEqual(distance(pair[0], pair[1]), dist) def testCommutitive(self): for i in xrange(100): x, y, z = newID(), newID(), newID() - self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z)) + self.failUnlessEqual(distance(x,y) ^ distance(y, z), distance(x, z)) class TestRandRange(unittest.TestCase): def testOnce(self): @@ -85,10 +85,10 @@ class TestRandRange(unittest.TestCase): b = intify(newID()) if a < b: c = randRange(a, b) - self.assertEqual(a <= c < b, 1, "output out of range %d %d %d" % (b, c, a)) + self.failUnlessEqual(a <= c < b, True, "output out of range %d %d %d" % (b, c, a)) else: c = randRange(b, a) - assert b <= c < a, "output out of range %d %d %d" % (b, c, a) + self.failUnlessEqual(b <= c < a, True, "output out of range %d %d %d" % (b, c, a)) def testOneHundredTimes(self): for i in xrange(100): diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index ef1b826..1cb4a3c 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -34,7 +34,7 @@ class KhashmirBase(protocol.Factory): self.node = self._loadSelfNode('', self.port) self.table = KTable(self.node, config) #self.app = service.Application("krpc") - self.udp = krpc.hostbroker(self) + self.udp = krpc.hostbroker(self, config) self.udp.protocol = krpc.KRPC self.listenport = reactor.listenUDP(self.port, self.udp) self._loadRoutingTable() @@ -286,7 +286,7 @@ class SimpleTests(unittest.TestCase): 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, - 'KE_AGE': 3600, } + 'KE_AGE': 3600, 'SPEW': False, } def setUp(self): krpc.KRPC.noisy = 0 @@ -304,11 +304,11 @@ class SimpleTests(unittest.TestCase): os.unlink(self.b.store.db) def testAddContact(self): - self.assertEqual(len(self.a.table.buckets), 1) - self.assertEqual(len(self.a.table.buckets[0].l), 0) + self.failUnlessEqual(len(self.a.table.buckets), 1) + self.failUnlessEqual(len(self.a.table.buckets[0].l), 0) - self.assertEqual(len(self.b.table.buckets), 1) - self.assertEqual(len(self.b.table.buckets[0].l), 0) + self.failUnlessEqual(len(self.b.table.buckets), 1) + self.failUnlessEqual(len(self.b.table.buckets[0].l), 0) self.a.addContact('127.0.0.1', 4045) reactor.iterate() @@ -316,10 +316,10 @@ class SimpleTests(unittest.TestCase): reactor.iterate() reactor.iterate() - self.assertEqual(len(self.a.table.buckets), 1) - self.assertEqual(len(self.a.table.buckets[0].l), 1) - self.assertEqual(len(self.b.table.buckets), 1) - self.assertEqual(len(self.b.table.buckets[0].l), 1) + self.failUnlessEqual(len(self.a.table.buckets), 1) + self.failUnlessEqual(len(self.a.table.buckets[0].l), 1) + self.failUnlessEqual(len(self.b.table.buckets), 1) + self.failUnlessEqual(len(self.b.table.buckets[0].l), 1) def testStoreRetrieve(self): self.a.addContact('127.0.0.1', 4045) @@ -346,7 +346,7 @@ class SimpleTests(unittest.TestCase): def _cb(self, key, val): if not val: - self.assertEqual(self.got, 1) + self.failUnlessEqual(self.got, 1) elif 'foobar' in val: self.got = 1 @@ -360,7 +360,7 @@ class MultiTest(unittest.TestCase): 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, - 'KE_AGE': 3600, } + 'KE_AGE': 3600, 'SPEW': False, } def _done(self, val): self.done = 1 @@ -418,7 +418,7 @@ class MultiTest(unittest.TestCase): def _rcb(key, val): if not val: self.done = 1 - self.assertEqual(self.got, 1) + self.failUnlessEqual(self.got, 1) elif V in val: self.got = 1 for x in range(3): diff --git a/apt_dht_Khashmir/krpc.py b/apt_dht_Khashmir/krpc.py index 7427e0a..40ab8d8 100644 --- a/apt_dht_Khashmir/krpc.py +++ b/apt_dht_Khashmir/krpc.py @@ -29,8 +29,9 @@ class ProtocolError(Exception): pass class hostbroker(protocol.DatagramProtocol): - def __init__(self, server): + def __init__(self, server, config): self.server = server + self.config = config # this should be changed to storage that drops old entries self.connections = {} @@ -46,7 +47,7 @@ class hostbroker(protocol.DatagramProtocol): if addr == self.addr: raise Exception if not self.connections.has_key(addr): - conn = self.protocol(addr, self.server, self.transport) + conn = self.protocol(addr, self.server, self.transport, self.config['SPEW']) self.connections[addr] = conn else: conn = self.connections[addr] @@ -64,11 +65,11 @@ class hostbroker(protocol.DatagramProtocol): ## connection class KRPC: - noisy = 0 - def __init__(self, addr, server, transport): + def __init__(self, addr, server, transport, spew = False): self.transport = transport self.factory = server self.addr = addr + self.noisy = spew self.tids = {} self.mtid = 0 self.stopped = False @@ -95,35 +96,16 @@ class KRPC: msg[ARG]['_krpc_sender'] = self.addr if f and callable(f): try: - ret = apply(f, (), msg[ARG]) + ret = f(*(), **msg[ARG]) except Exception, e: - ## send error - out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`}) - olen = len(out) - if self.noisy: - print self.factory.port, "responding to", addr, self.addr, ":", out - self.transport.write(out, addr) + olen = self._sendResponse(addr, msg[TID], ERR, `format_exception(type(e), e, sys.exc_info()[2])`) else: - if ret: - # make response - out = bencode({TID : msg[TID], TYP : RSP, RSP : ret}) - else: - out = bencode({TID : msg[TID], TYP : RSP, RSP : {}}) - # send response - olen = len(out) - if self.noisy: - print self.factory.port, "responding to", addr, self.addr, ":", out - self.transport.write(out, addr) - + olen = self._sendResponse(addr, msg[TID], RSP, ret) else: if self.noisy: print "don't know about method %s" % msg[REQ] # unknown method - out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN}) - olen = len(out) - if self.noisy: - print self.factory.port, "responding to", addr, self.addr, ":", out - self.transport.write(out, addr) + olen = self._sendResponse(addr, msg[TID], ERR, KRPC_ERROR_METHOD_UNKNOWN) if self.noisy: print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port, ilen, msg[REQ], olen) @@ -157,6 +139,19 @@ class KRPC: df.errback(KRPC_ERROR_RECEIVED_UNKNOWN) del(self.tids[msg[TID]]) + def _sendResponse(self, addr, tid, msgType, response): + if not response: + response = {} + + msg = {TID : tid, TYP : msgType, msgType : response} + + if self.noisy: + print self.factory.port, "responding to", addr, ":", msg + + out = bencode(msg) + self.transport.write(out, addr) + return len(out) + def sendRequest(self, method, args): if self.stopped: raise ProtocolError, "connection has been stopped" @@ -205,14 +200,13 @@ class Receiver(protocol.Factory): def make(port): af = Receiver() - a = hostbroker(af) + a = hostbroker(af, {'SPEW': False}) a.protocol = KRPC p = reactor.listenUDP(port, a) return af, a, p class KRPCTests(unittest.TestCase): def setUp(self): - KRPC.noisy = 0 self.af, self.a, self.ap = make(1180) self.bf, self.b, self.bp = make(1181) @@ -221,7 +215,7 @@ class KRPCTests(unittest.TestCase): self.bp.stopListening() def bufEquals(self, result, value): - self.assertEqual(self.bf.buf, value) + self.failUnlessEqual(self.bf.buf, value) def testSimpleMessage(self): d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."}) @@ -242,7 +236,7 @@ class KRPCTests(unittest.TestCase): def gotMsg(self, dict, should_be): _krpc_sender = dict['_krpc_sender'] msg = dict['rsp'] - self.assertEqual(msg, should_be) + self.failUnlessEqual(msg, should_be) def testManyEcho(self): for i in xrange(100): @@ -283,4 +277,4 @@ class KRPCTests(unittest.TestCase): return df def gotErr(self, err, should_be): - self.assertEqual(err.value, should_be) + self.failUnlessEqual(err.value, should_be) diff --git a/apt_dht_Khashmir/ktable.py b/apt_dht_Khashmir/ktable.py index 7ffde39..6dc5fd6 100644 --- a/apt_dht_Khashmir/ktable.py +++ b/apt_dht_Khashmir/ktable.py @@ -216,20 +216,20 @@ class TestKTable(unittest.TestCase): def testAddNode(self): self.b = Node(khash.newID(), 'localhost', 2003) self.t.insertNode(self.b) - self.assertEqual(len(self.t.buckets[0].l), 1) - self.assertEqual(self.t.buckets[0].l[0], self.b) + self.failUnlessEqual(len(self.t.buckets[0].l), 1) + self.failUnlessEqual(self.t.buckets[0].l[0], self.b) def testRemove(self): self.testAddNode() self.t.invalidateNode(self.b) - self.assertEqual(len(self.t.buckets[0].l), 0) + self.failUnlessEqual(len(self.t.buckets[0].l), 0) def testFail(self): self.testAddNode() for i in range(self.t.config['MAX_FAILURES'] - 1): self.t.nodeFailed(self.b) - self.assertEqual(len(self.t.buckets[0].l), 1) - self.assertEqual(self.t.buckets[0].l[0], self.b) + self.failUnlessEqual(len(self.t.buckets[0].l), 1) + self.failUnlessEqual(self.t.buckets[0].l[0], self.b) self.t.nodeFailed(self.b) - self.assertEqual(len(self.t.buckets[0].l), 0) + self.failUnlessEqual(len(self.t.buckets[0].l), 0) diff --git a/apt_dht_Khashmir/node.py b/apt_dht_Khashmir/node.py index 609e666..89845cb 100644 --- a/apt_dht_Khashmir/node.py +++ b/apt_dht_Khashmir/node.py @@ -23,8 +23,8 @@ class Node: port = id['port'] id = id['id'] - assert(isinstance(id, str)) - assert(isinstance(host, str)) + assert isinstance(id, str) + assert isinstance(host, str) self.id = id self.num = khash.intify(id) self.host = host @@ -78,5 +78,5 @@ class TestNode(unittest.TestCase): def testUpdateLastSeen(self): t = self.node.lastSeen self.node.updateLastSeen() - assert t < self.node.lastSeen + self.failUnless(t < self.node.lastSeen) \ No newline at end of file -- 2.39.5