]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
Use function for sending krpc responses, and add spew parameter.
authorCameron Dale <camrdale@gmail.com>
Thu, 10 Jan 2008 20:45:04 +0000 (12:45 -0800)
committerCameron Dale <camrdale@gmail.com>
Thu, 10 Jan 2008 20:45:04 +0000 (12:45 -0800)
Sending a response in the krpc module now calls a function.

There is a new 'SPEW' config variable that causes the krpc
module to spew out all sent and received packet data
(previously called noisy).

Also changed all the tests to use failUnlessEqual instead
of assertEqual, and all assert() commands no longer use
brackets.

Updated some of the downloading tests for the new file sizes
on camrdale.org (need a better solution here).

12 files changed:
apt-dht.py
apt_dht/HTTPDownloader.py
apt_dht/Hash.py
apt_dht/PeerManager.py
apt_dht/apt_dht_conf.py
apt_dht_Khashmir/DHT.py
apt_dht_Khashmir/actions.py
apt_dht_Khashmir/khash.py
apt_dht_Khashmir/khashmir.py
apt_dht_Khashmir/krpc.py
apt_dht_Khashmir/ktable.py
apt_dht_Khashmir/node.py

index a532527f0f457941fd8c0741fdf0bc5705a3dd6f..3cc5d94f0ef502fc8fc416e44d4b24a918ff8feb 100644 (file)
@@ -57,7 +57,7 @@ application = service.Application("apt-dht", uid, gid)
 
 log.msg('Starting DHT')
 DHT = __import__(config.get('DEFAULT', 'DHT')+'.DHT', globals(), locals(), ['DHT'])
-assert(IDHT.implementedBy(DHT.DHT), "You must provide a DHT implementation that implements the IDHT interface.")
+assert IDHT.implementedBy(DHT.DHT), "You must provide a DHT implementation that implements the IDHT interface."
 myDHT = DHT.DHT()
 myDHT.loadConfig(config, config.get('DEFAULT', 'DHT'))
 myDHT.join()
index 91ce7c14c598cb70af1cb545547225674754ea32..c906c0e87d133a52310dd91800449af90c95a2e4 100644 (file)
@@ -36,7 +36,7 @@ class HTTPClientManager(ClientFactory):
         self.connector = None
         
     def connect(self):
-        assert(self.closed and not self.connecting)
+        assert self.closed and not self.connecting
         self.connecting = True
         d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
         d.addCallback(self.connected)
@@ -176,15 +176,15 @@ class TestClientManager(unittest.TestCase):
                 d.addBoth(lastDefer.callback)
                 
         newRequest("/", 1, 3433)
-        newRequest("/blog/", 2, 37121)
+        newRequest("/blog/", 2, 39152)
         newRequest("/camrdale.html", 3, 2234)
         self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309))
         self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084))
-        self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4750))
+        self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4756))
         self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783))
         self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525))
         self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381))
-        self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 302362, True))
+        self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 313470, True))
         return lastDefer
         
     def test_multiple_quick_downloads(self):
@@ -200,15 +200,15 @@ class TestClientManager(unittest.TestCase):
                 d.addBoth(lastDefer.callback)
                 
         newRequest("/", 1, 3433)
-        newRequest("/blog/", 2, 37121)
+        newRequest("/blog/", 2, 39152)
         newRequest("/camrdale.html", 3, 2234)
         self.pending_calls.append(reactor.callLater(0, newRequest, '/robots.txt', 4, 309))
         self.pending_calls.append(reactor.callLater(0, newRequest, '/wikilink.html', 5, 3084))
-        self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4750))
+        self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4756))
         self.pending_calls.append(reactor.callLater(0, newRequest, '/PlanetLab.html', 7, 2783))
         self.pending_calls.append(reactor.callLater(0, newRequest, '/openid.html', 8, 2525))
         self.pending_calls.append(reactor.callLater(0, newRequest, '/subpage.html', 9, 2381))
-        self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 302362, True))
+        self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 313470, True))
         return lastDefer
         
     def test_range(self):
index 3149f5872f886049fb5f7aa8b9d5947a23d074b1..bb993f1755bbbb80d5b8a23a2afb40a17b2c53fb 100644 (file)
@@ -53,7 +53,7 @@ class HashObject:
         if bits is not None:
             bytes = (bits - 1) // 8 + 1
         else:
-            assert(bytes is not None)
+            assert bytes is not None, "you must specify one of bits or bytes"
         if len(hashString) < bytes:
             hashString = hashString + '\000'*(bytes - len(hashString))
         elif len(hashString) > bytes:
index 25bd4f5959e1015aa26848ba18805e7f6741aa4e..75c135d2b946c0dca0c26daaf9d4a3e2a0088f61 100644 (file)
@@ -23,7 +23,7 @@ class PeerManager:
         url = choice(locations)
         log.msg('Downloading %s' % url)
         parsed = urlparse(url)
-        assert(parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0])
+        assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
         host, port = splitHostPort(parsed[0], parsed[1])
         path = urlunparse(('', '') + parsed[2:])
 
@@ -88,11 +88,11 @@ class TestPeerManager(unittest.TestCase):
                 d.addBoth(lastDefer.callback)
                 
         newRequest('www.camrdale.org', "/", 1, 3433)
-        newRequest('www.camrdale.org', "/blog/", 2, 37121)
+        newRequest('www.camrdale.org', "/blog/", 2, 39152)
         newRequest('www.google.ca', "/", 3, None)
         self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
         self.pending_calls.append(reactor.callLater(10, newRequest, 'www.camrdale.org', '/wikilink.html', 5, 3084))
-        self.pending_calls.append(reactor.callLater(30, newRequest, 'www.camrdale.org', '/sitemap.html', 6, 4750))
+        self.pending_calls.append(reactor.callLater(30, newRequest, 'www.camrdale.org', '/sitemap.html', 6, 4756))
         self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/openid.html', 8, 2525))
         self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/subpage.html', 9, 2381))
index dd474bfc622b8c5a58b98296ad463affdc89ef63..98bbd884358bbd1b09ae69945e500d8fd9b06b68 100644 (file)
@@ -80,6 +80,9 @@ DHT_DEFAULTS = {
     
     # expire entries older than this
     'KE_AGE': '1h', # 60 minutes
+    
+    # whether to spew info about the requests/responses in the protocol
+    'SPEW': 'yes',
 }
 
 class AptDHTConfigParser(SafeConfigParser):
index 590a77cb224a59026e41ca41631340daf6362902..f77962e427416ac6f90f2258a67852390efe0ba3 100644 (file)
@@ -42,6 +42,8 @@ class DHT:
             elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL', 
                        'BUCKET_STALENESS', 'KEINITIAL_DELAY', 'KE_DELAY', 'KE_AGE']:
                 self.config[k] = self.config_parser.gettime(section, k)
+            elif k in ['SPEW']:
+                self.config[k] = self.config_parser.getboolean(section, k)
             else:
                 self.config[k] = self.config_parser.get(section, k)
     
@@ -158,7 +160,7 @@ class TestSimpleDHT(unittest.TestCase):
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
-                    'KE_AGE': 3600, }
+                    'KE_AGE': 3600, 'SPEW': False, }
 
     def setUp(self):
         self.a = DHT()
@@ -251,7 +253,7 @@ class TestMultiDHT(unittest.TestCase):
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
-                    'KE_AGE': 3600, }
+                    'KE_AGE': 3600, 'SPEW': False, }
 
     def setUp(self):
         self.l = []
index a99b7ea7d9c5810e7cf443f4e9673a3ea1e936f7..9bfa2e8a708be3a5726b35fff76f7dad0772d9bf 100644 (file)
@@ -75,7 +75,7 @@ class FindNode(ActionBase):
                 self.queried[node.id] = 1
             if self.outstanding >= self.config['CONCURRENT_REQS']:
                 break
-        assert(self.outstanding) >=0
+        assert self.outstanding >=0
         if self.outstanding == 0:
             ## all done!!
             self.finished=1
@@ -162,7 +162,7 @@ class GetValue(FindNode):
                     self.queried[node.id] = 1
             if self.outstanding >= self.config['CONCURRENT_REQS']:
                 break
-        assert(self.outstanding) >=0
+        assert self.outstanding >=0
         if self.outstanding == 0:
             ## all done, didn't find it!!
             self.finished=1
index 8db84d8473af5cec4eb766de9ea6644b8ec12dd4..0f0d8e36902a695752c69f9b64f8883fe2f81880 100644 (file)
@@ -43,7 +43,7 @@ def newTID():
 
 class TestNewID(unittest.TestCase):
     def testLength(self):
-        self.assertEqual(len(newID()), 20)
+        self.failUnlessEqual(len(newID()), 20)
     def testHundreds(self):
         for x in xrange(100):
             self.testLength
@@ -54,13 +54,13 @@ class TestIntify(unittest.TestCase):
             ]
     def testKnown(self):
         for str, value in self.known: 
-            self.assertEqual(intify(str),  value)
+            self.failUnlessEqual(intify(str),  value)
     def testEndianessOnce(self):
         h = newID()
         while h[-1] == '\xff':
             h = newID()
         k = h[:-1] + chr(ord(h[-1]) + 1)
-        self.assertEqual(intify(k) - intify(h), 1)
+        self.failUnlessEqual(intify(k) - intify(h), 1)
     def testEndianessLots(self):
         for x in xrange(100):
             self.testEndianessOnce()
@@ -73,11 +73,11 @@ class TestDisantance(unittest.TestCase):
             ]
     def testKnown(self):
         for pair, dist in self.known:
-            self.assertEqual(distance(pair[0], pair[1]), dist)
+            self.failUnlessEqual(distance(pair[0], pair[1]), dist)
     def testCommutitive(self):
         for i in xrange(100):
             x, y, z = newID(), newID(), newID()
-            self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z))
+            self.failUnlessEqual(distance(x,y) ^ distance(y, z), distance(x, z))
         
 class TestRandRange(unittest.TestCase):
     def testOnce(self):
@@ -85,10 +85,10 @@ class TestRandRange(unittest.TestCase):
         b = intify(newID())
         if a < b:
             c = randRange(a, b)
-            self.assertEqual(a <= c < b, 1, "output out of range %d  %d  %d" % (b, c, a))
+            self.failUnlessEqual(a <= c < b, True, "output out of range %d  %d  %d" % (b, c, a))
         else:
             c = randRange(b, a)
-            assert b <= c < a, "output out of range %d  %d  %d" % (b, c, a)
+            self.failUnlessEqual(b <= c < a, True, "output out of range %d  %d  %d" % (b, c, a))
 
     def testOneHundredTimes(self):
         for i in xrange(100):
index ef1b826c1346dd7593eb91738405af0ae3b8827f..1cb4a3c187e0e50b313c57d1b5ae4400abcd7295 100644 (file)
@@ -34,7 +34,7 @@ class KhashmirBase(protocol.Factory):
         self.node = self._loadSelfNode('', self.port)
         self.table = KTable(self.node, config)
         #self.app = service.Application("krpc")
-        self.udp = krpc.hostbroker(self)
+        self.udp = krpc.hostbroker(self, config)
         self.udp.protocol = krpc.KRPC
         self.listenport = reactor.listenUDP(self.port, self.udp)
         self._loadRoutingTable()
@@ -286,7 +286,7 @@ class SimpleTests(unittest.TestCase):
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
-                    'KE_AGE': 3600, }
+                    'KE_AGE': 3600, 'SPEW': False, }
 
     def setUp(self):
         krpc.KRPC.noisy = 0
@@ -304,11 +304,11 @@ class SimpleTests(unittest.TestCase):
         os.unlink(self.b.store.db)
 
     def testAddContact(self):
-        self.assertEqual(len(self.a.table.buckets), 1)
-        self.assertEqual(len(self.a.table.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.a.table.buckets), 1)
+        self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
 
-        self.assertEqual(len(self.b.table.buckets), 1)
-        self.assertEqual(len(self.b.table.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.b.table.buckets), 1)
+        self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
 
         self.a.addContact('127.0.0.1', 4045)
         reactor.iterate()
@@ -316,10 +316,10 @@ class SimpleTests(unittest.TestCase):
         reactor.iterate()
         reactor.iterate()
 
-        self.assertEqual(len(self.a.table.buckets), 1)
-        self.assertEqual(len(self.a.table.buckets[0].l), 1)
-        self.assertEqual(len(self.b.table.buckets), 1)
-        self.assertEqual(len(self.b.table.buckets[0].l), 1)
+        self.failUnlessEqual(len(self.a.table.buckets), 1)
+        self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
+        self.failUnlessEqual(len(self.b.table.buckets), 1)
+        self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
 
     def testStoreRetrieve(self):
         self.a.addContact('127.0.0.1', 4045)
@@ -346,7 +346,7 @@ class SimpleTests(unittest.TestCase):
 
     def _cb(self, key, val):
         if not val:
-            self.assertEqual(self.got, 1)
+            self.failUnlessEqual(self.got, 1)
         elif 'foobar' in val:
             self.got = 1
 
@@ -360,7 +360,7 @@ class MultiTest(unittest.TestCase):
                     'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
                     'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
                     'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
-                    'KE_AGE': 3600, }
+                    'KE_AGE': 3600, 'SPEW': False, }
 
     def _done(self, val):
         self.done = 1
@@ -418,7 +418,7 @@ class MultiTest(unittest.TestCase):
                 def _rcb(key, val):
                     if not val:
                         self.done = 1
-                        self.assertEqual(self.got, 1)
+                        self.failUnlessEqual(self.got, 1)
                     elif V in val:
                         self.got = 1
                 for x in range(3):
index 7427e0aab35577e03bff5b0fddb6a1b5402ffda3..40ab8d84145e3e7e5650f67abbb7edd95b785861 100644 (file)
@@ -29,8 +29,9 @@ class ProtocolError(Exception):
     pass
 
 class hostbroker(protocol.DatagramProtocol):       
-    def __init__(self, server):
+    def __init__(self, server, config):
         self.server = server
+        self.config = config
         # this should be changed to storage that drops old entries
         self.connections = {}
         
@@ -46,7 +47,7 @@ class hostbroker(protocol.DatagramProtocol):
         if addr == self.addr:
             raise Exception
         if not self.connections.has_key(addr):
-            conn = self.protocol(addr, self.server, self.transport)
+            conn = self.protocol(addr, self.server, self.transport, self.config['SPEW'])
             self.connections[addr] = conn
         else:
             conn = self.connections[addr]
@@ -64,11 +65,11 @@ class hostbroker(protocol.DatagramProtocol):
 
 ## connection
 class KRPC:
-    noisy = 0
-    def __init__(self, addr, server, transport):
+    def __init__(self, addr, server, transport, spew = False):
         self.transport = transport
         self.factory = server
         self.addr = addr
+        self.noisy = spew
         self.tids = {}
         self.mtid = 0
         self.stopped = False
@@ -95,35 +96,16 @@ class KRPC:
                 msg[ARG]['_krpc_sender'] =  self.addr
                 if f and callable(f):
                     try:
-                        ret = apply(f, (), msg[ARG])
+                        ret = f(*(), **msg[ARG])
                     except Exception, e:
-                        ## send error
-                        out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
-                        olen = len(out)
-                        if self.noisy:
-                            print self.factory.port, "responding to", addr, self.addr, ":", out
-                        self.transport.write(out, addr)
+                        olen = self._sendResponse(addr, msg[TID], ERR, `format_exception(type(e), e, sys.exc_info()[2])`)
                     else:
-                        if ret:
-                            #  make response
-                            out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
-                        else:
-                            out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
-                        #      send response
-                        olen = len(out)
-                        if self.noisy:
-                            print self.factory.port, "responding to", addr, self.addr, ":", out
-                        self.transport.write(out, addr)
-
+                        olen = self._sendResponse(addr, msg[TID], RSP, ret)
                 else:
                     if self.noisy:
                         print "don't know about method %s" % msg[REQ]
                     # unknown method
-                    out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
-                    olen = len(out)
-                    if self.noisy:
-                        print self.factory.port, "responding to", addr, self.addr, ":", out
-                    self.transport.write(out, addr)
+                    olen = self._sendResponse(addr, msg[TID], ERR, KRPC_ERROR_METHOD_UNKNOWN)
                 if self.noisy:
                     print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port, 
                                                     ilen, msg[REQ], olen)
@@ -157,6 +139,19 @@ class KRPC:
                 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
                 del(self.tids[msg[TID]])
                 
+    def _sendResponse(self, addr, tid, msgType, response):
+        if not response:
+            response = {}
+            
+        msg = {TID : tid, TYP : msgType, msgType : response}
+
+        if self.noisy:
+            print self.factory.port, "responding to", addr, ":", msg
+
+        out = bencode(msg)
+        self.transport.write(out, addr)
+        return len(out)
+    
     def sendRequest(self, method, args):
         if self.stopped:
             raise ProtocolError, "connection has been stopped"
@@ -205,14 +200,13 @@ class Receiver(protocol.Factory):
 
 def make(port):
     af = Receiver()
-    a = hostbroker(af)
+    a = hostbroker(af, {'SPEW': False})
     a.protocol = KRPC
     p = reactor.listenUDP(port, a)
     return af, a, p
     
 class KRPCTests(unittest.TestCase):
     def setUp(self):
-        KRPC.noisy = 0
         self.af, self.a, self.ap = make(1180)
         self.bf, self.b, self.bp = make(1181)
 
@@ -221,7 +215,7 @@ class KRPCTests(unittest.TestCase):
         self.bp.stopListening()
 
     def bufEquals(self, result, value):
-        self.assertEqual(self.bf.buf, value)
+        self.failUnlessEqual(self.bf.buf, value)
 
     def testSimpleMessage(self):
         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
@@ -242,7 +236,7 @@ class KRPCTests(unittest.TestCase):
     def gotMsg(self, dict, should_be):
         _krpc_sender = dict['_krpc_sender']
         msg = dict['rsp']
-        self.assertEqual(msg, should_be)
+        self.failUnlessEqual(msg, should_be)
 
     def testManyEcho(self):
         for i in xrange(100):
@@ -283,4 +277,4 @@ class KRPCTests(unittest.TestCase):
         return df
 
     def gotErr(self, err, should_be):
-        self.assertEqual(err.value, should_be)
+        self.failUnlessEqual(err.value, should_be)
index 7ffde392867d7d379bbdac4ad26d7eb70ba045f3..6dc5fd6e9e85eb4d7519df21848354adb4031ddc 100644 (file)
@@ -216,20 +216,20 @@ class TestKTable(unittest.TestCase):
     def testAddNode(self):
         self.b = Node(khash.newID(), 'localhost', 2003)
         self.t.insertNode(self.b)
-        self.assertEqual(len(self.t.buckets[0].l), 1)
-        self.assertEqual(self.t.buckets[0].l[0], self.b)
+        self.failUnlessEqual(len(self.t.buckets[0].l), 1)
+        self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
 
     def testRemove(self):
         self.testAddNode()
         self.t.invalidateNode(self.b)
-        self.assertEqual(len(self.t.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.t.buckets[0].l), 0)
 
     def testFail(self):
         self.testAddNode()
         for i in range(self.t.config['MAX_FAILURES'] - 1):
             self.t.nodeFailed(self.b)
-            self.assertEqual(len(self.t.buckets[0].l), 1)
-            self.assertEqual(self.t.buckets[0].l[0], self.b)
+            self.failUnlessEqual(len(self.t.buckets[0].l), 1)
+            self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
             
         self.t.nodeFailed(self.b)
-        self.assertEqual(len(self.t.buckets[0].l), 0)
+        self.failUnlessEqual(len(self.t.buckets[0].l), 0)
index 609e6662c5043c39eb6a8571ccab7b34b3c19abf..89845cb5eac4305f980919c3938278c9db082947 100644 (file)
@@ -23,8 +23,8 @@ class Node:
             port = id['port']
             id = id['id']
 
-        assert(isinstance(id, str))
-        assert(isinstance(host, str))
+        assert isinstance(id, str)
+        assert isinstance(host, str)
         self.id = id
         self.num = khash.intify(id)
         self.host = host
@@ -78,5 +78,5 @@ class TestNode(unittest.TestCase):
     def testUpdateLastSeen(self):
         t = self.node.lastSeen
         self.node.updateLastSeen()
-        assert t < self.node.lastSeen
+        self.failUnless(t < self.node.lastSeen)
     
\ No newline at end of file