Sending a response in the krpc module now calls a function.
There is a new 'SPEW' config variable that causes the krpc
module to spew out all sent and received packet data
(previously called noisy).
Also changed all the tests to use failUnlessEqual instead
of assertEqual, and all assert() commands no longer use
brackets.
Updated some of the downloading tests for the new file sizes
on camrdale.org (need a better solution here).
log.msg('Starting DHT')
DHT = __import__(config.get('DEFAULT', 'DHT')+'.DHT', globals(), locals(), ['DHT'])
-assert(IDHT.implementedBy(DHT.DHT), "You must provide a DHT implementation that implements the IDHT interface.")
+assert IDHT.implementedBy(DHT.DHT), "You must provide a DHT implementation that implements the IDHT interface."
myDHT = DHT.DHT()
myDHT.loadConfig(config, config.get('DEFAULT', 'DHT'))
myDHT.join()
self.connector = None
def connect(self):
- assert(self.closed and not self.connecting)
+ assert self.closed and not self.connecting
self.connecting = True
d = protocol.ClientCreator(reactor, HTTPClientProtocol, self).connectTCP(self.host, self.port)
d.addCallback(self.connected)
d.addBoth(lastDefer.callback)
newRequest("/", 1, 3433)
- newRequest("/blog/", 2, 37121)
+ newRequest("/blog/", 2, 39152)
newRequest("/camrdale.html", 3, 2234)
self.pending_calls.append(reactor.callLater(1, newRequest, '/robots.txt', 4, 309))
self.pending_calls.append(reactor.callLater(10, newRequest, '/wikilink.html', 5, 3084))
- self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4750))
+ self.pending_calls.append(reactor.callLater(30, newRequest, '/sitemap.html', 6, 4756))
self.pending_calls.append(reactor.callLater(31, newRequest, '/PlanetLab.html', 7, 2783))
self.pending_calls.append(reactor.callLater(32, newRequest, '/openid.html', 8, 2525))
self.pending_calls.append(reactor.callLater(32, newRequest, '/subpage.html', 9, 2381))
- self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 302362, True))
+ self.pending_calls.append(reactor.callLater(62, newRequest, '/sitemap2.rss', 0, 313470, True))
return lastDefer
def test_multiple_quick_downloads(self):
d.addBoth(lastDefer.callback)
newRequest("/", 1, 3433)
- newRequest("/blog/", 2, 37121)
+ newRequest("/blog/", 2, 39152)
newRequest("/camrdale.html", 3, 2234)
self.pending_calls.append(reactor.callLater(0, newRequest, '/robots.txt', 4, 309))
self.pending_calls.append(reactor.callLater(0, newRequest, '/wikilink.html', 5, 3084))
- self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4750))
+ self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap.html', 6, 4756))
self.pending_calls.append(reactor.callLater(0, newRequest, '/PlanetLab.html', 7, 2783))
self.pending_calls.append(reactor.callLater(0, newRequest, '/openid.html', 8, 2525))
self.pending_calls.append(reactor.callLater(0, newRequest, '/subpage.html', 9, 2381))
- self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 302362, True))
+ self.pending_calls.append(reactor.callLater(0, newRequest, '/sitemap2.rss', 0, 313470, True))
return lastDefer
def test_range(self):
if bits is not None:
bytes = (bits - 1) // 8 + 1
else:
- assert(bytes is not None)
+ assert bytes is not None, "you must specify one of bits or bytes"
if len(hashString) < bytes:
hashString = hashString + '\000'*(bytes - len(hashString))
elif len(hashString) > bytes:
url = choice(locations)
log.msg('Downloading %s' % url)
parsed = urlparse(url)
- assert(parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0])
+ assert parsed[0] == "http", "Only HTTP is supported, not '%s'" % parsed[0]
host, port = splitHostPort(parsed[0], parsed[1])
path = urlunparse(('', '') + parsed[2:])
d.addBoth(lastDefer.callback)
newRequest('www.camrdale.org', "/", 1, 3433)
- newRequest('www.camrdale.org', "/blog/", 2, 37121)
+ newRequest('www.camrdale.org', "/blog/", 2, 39152)
newRequest('www.google.ca', "/", 3, None)
self.pending_calls.append(reactor.callLater(1, newRequest, 'www.sfu.ca', '/', 4, None))
self.pending_calls.append(reactor.callLater(10, newRequest, 'www.camrdale.org', '/wikilink.html', 5, 3084))
- self.pending_calls.append(reactor.callLater(30, newRequest, 'www.camrdale.org', '/sitemap.html', 6, 4750))
+ self.pending_calls.append(reactor.callLater(30, newRequest, 'www.camrdale.org', '/sitemap.html', 6, 4756))
self.pending_calls.append(reactor.callLater(31, newRequest, 'www.sfu.ca', '/studentcentral/index.html', 7, None))
self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/openid.html', 8, 2525))
self.pending_calls.append(reactor.callLater(32, newRequest, 'www.camrdale.org', '/subpage.html', 9, 2381))
# expire entries older than this
'KE_AGE': '1h', # 60 minutes
+
+ # whether to spew info about the requests/responses in the protocol
+ 'SPEW': 'yes',
}
class AptDHTConfigParser(SafeConfigParser):
elif k in ['CHECKPOINT_INTERVAL', 'MIN_PING_INTERVAL',
'BUCKET_STALENESS', 'KEINITIAL_DELAY', 'KE_DELAY', 'KE_AGE']:
self.config[k] = self.config_parser.gettime(section, k)
+ elif k in ['SPEW']:
+ self.config[k] = self.config_parser.getboolean(section, k)
else:
self.config[k] = self.config_parser.get(section, k)
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, }
+ 'KE_AGE': 3600, 'SPEW': False, }
def setUp(self):
self.a = DHT()
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, }
+ 'KE_AGE': 3600, 'SPEW': False, }
def setUp(self):
self.l = []
self.queried[node.id] = 1
if self.outstanding >= self.config['CONCURRENT_REQS']:
break
- assert(self.outstanding) >=0
+ assert self.outstanding >=0
if self.outstanding == 0:
## all done!!
self.finished=1
self.queried[node.id] = 1
if self.outstanding >= self.config['CONCURRENT_REQS']:
break
- assert(self.outstanding) >=0
+ assert self.outstanding >=0
if self.outstanding == 0:
## all done, didn't find it!!
self.finished=1
class TestNewID(unittest.TestCase):
def testLength(self):
- self.assertEqual(len(newID()), 20)
+ self.failUnlessEqual(len(newID()), 20)
def testHundreds(self):
for x in xrange(100):
self.testLength
]
def testKnown(self):
for str, value in self.known:
- self.assertEqual(intify(str), value)
+ self.failUnlessEqual(intify(str), value)
def testEndianessOnce(self):
h = newID()
while h[-1] == '\xff':
h = newID()
k = h[:-1] + chr(ord(h[-1]) + 1)
- self.assertEqual(intify(k) - intify(h), 1)
+ self.failUnlessEqual(intify(k) - intify(h), 1)
def testEndianessLots(self):
for x in xrange(100):
self.testEndianessOnce()
]
def testKnown(self):
for pair, dist in self.known:
- self.assertEqual(distance(pair[0], pair[1]), dist)
+ self.failUnlessEqual(distance(pair[0], pair[1]), dist)
def testCommutitive(self):
for i in xrange(100):
x, y, z = newID(), newID(), newID()
- self.assertEqual(distance(x,y) ^ distance(y, z), distance(x, z))
+ self.failUnlessEqual(distance(x,y) ^ distance(y, z), distance(x, z))
class TestRandRange(unittest.TestCase):
def testOnce(self):
b = intify(newID())
if a < b:
c = randRange(a, b)
- self.assertEqual(a <= c < b, 1, "output out of range %d %d %d" % (b, c, a))
+ self.failUnlessEqual(a <= c < b, True, "output out of range %d %d %d" % (b, c, a))
else:
c = randRange(b, a)
- assert b <= c < a, "output out of range %d %d %d" % (b, c, a)
+ self.failUnlessEqual(b <= c < a, True, "output out of range %d %d %d" % (b, c, a))
def testOneHundredTimes(self):
for i in xrange(100):
self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
#self.app = service.Application("krpc")
- self.udp = krpc.hostbroker(self)
+ self.udp = krpc.hostbroker(self, config)
self.udp.protocol = krpc.KRPC
self.listenport = reactor.listenUDP(self.port, self.udp)
self._loadRoutingTable()
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, }
+ 'KE_AGE': 3600, 'SPEW': False, }
def setUp(self):
krpc.KRPC.noisy = 0
os.unlink(self.b.store.db)
def testAddContact(self):
- self.assertEqual(len(self.a.table.buckets), 1)
- self.assertEqual(len(self.a.table.buckets[0].l), 0)
+ self.failUnlessEqual(len(self.a.table.buckets), 1)
+ self.failUnlessEqual(len(self.a.table.buckets[0].l), 0)
- self.assertEqual(len(self.b.table.buckets), 1)
- self.assertEqual(len(self.b.table.buckets[0].l), 0)
+ self.failUnlessEqual(len(self.b.table.buckets), 1)
+ self.failUnlessEqual(len(self.b.table.buckets[0].l), 0)
self.a.addContact('127.0.0.1', 4045)
reactor.iterate()
reactor.iterate()
reactor.iterate()
- self.assertEqual(len(self.a.table.buckets), 1)
- self.assertEqual(len(self.a.table.buckets[0].l), 1)
- self.assertEqual(len(self.b.table.buckets), 1)
- self.assertEqual(len(self.b.table.buckets[0].l), 1)
+ self.failUnlessEqual(len(self.a.table.buckets), 1)
+ self.failUnlessEqual(len(self.a.table.buckets[0].l), 1)
+ self.failUnlessEqual(len(self.b.table.buckets), 1)
+ self.failUnlessEqual(len(self.b.table.buckets[0].l), 1)
def testStoreRetrieve(self):
self.a.addContact('127.0.0.1', 4045)
def _cb(self, key, val):
if not val:
- self.assertEqual(self.got, 1)
+ self.failUnlessEqual(self.got, 1)
elif 'foobar' in val:
self.got = 1
'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3,
'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600,
'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200,
- 'KE_AGE': 3600, }
+ 'KE_AGE': 3600, 'SPEW': False, }
def _done(self, val):
self.done = 1
def _rcb(key, val):
if not val:
self.done = 1
- self.assertEqual(self.got, 1)
+ self.failUnlessEqual(self.got, 1)
elif V in val:
self.got = 1
for x in range(3):
pass
class hostbroker(protocol.DatagramProtocol):
- def __init__(self, server):
+ def __init__(self, server, config):
self.server = server
+ self.config = config
# this should be changed to storage that drops old entries
self.connections = {}
if addr == self.addr:
raise Exception
if not self.connections.has_key(addr):
- conn = self.protocol(addr, self.server, self.transport)
+ conn = self.protocol(addr, self.server, self.transport, self.config['SPEW'])
self.connections[addr] = conn
else:
conn = self.connections[addr]
## connection
class KRPC:
- noisy = 0
- def __init__(self, addr, server, transport):
+ def __init__(self, addr, server, transport, spew = False):
self.transport = transport
self.factory = server
self.addr = addr
+ self.noisy = spew
self.tids = {}
self.mtid = 0
self.stopped = False
msg[ARG]['_krpc_sender'] = self.addr
if f and callable(f):
try:
- ret = apply(f, (), msg[ARG])
+ ret = f(*(), **msg[ARG])
except Exception, e:
- ## send error
- out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
- olen = len(out)
- if self.noisy:
- print self.factory.port, "responding to", addr, self.addr, ":", out
- self.transport.write(out, addr)
+ olen = self._sendResponse(addr, msg[TID], ERR, `format_exception(type(e), e, sys.exc_info()[2])`)
else:
- if ret:
- # make response
- out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
- else:
- out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
- # send response
- olen = len(out)
- if self.noisy:
- print self.factory.port, "responding to", addr, self.addr, ":", out
- self.transport.write(out, addr)
-
+ olen = self._sendResponse(addr, msg[TID], RSP, ret)
else:
if self.noisy:
print "don't know about method %s" % msg[REQ]
# unknown method
- out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
- olen = len(out)
- if self.noisy:
- print self.factory.port, "responding to", addr, self.addr, ":", out
- self.transport.write(out, addr)
+ olen = self._sendResponse(addr, msg[TID], ERR, KRPC_ERROR_METHOD_UNKNOWN)
if self.noisy:
print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port,
ilen, msg[REQ], olen)
df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
del(self.tids[msg[TID]])
+ def _sendResponse(self, addr, tid, msgType, response):
+ if not response:
+ response = {}
+
+ msg = {TID : tid, TYP : msgType, msgType : response}
+
+ if self.noisy:
+ print self.factory.port, "responding to", addr, ":", msg
+
+ out = bencode(msg)
+ self.transport.write(out, addr)
+ return len(out)
+
def sendRequest(self, method, args):
if self.stopped:
raise ProtocolError, "connection has been stopped"
def make(port):
af = Receiver()
- a = hostbroker(af)
+ a = hostbroker(af, {'SPEW': False})
a.protocol = KRPC
p = reactor.listenUDP(port, a)
return af, a, p
class KRPCTests(unittest.TestCase):
def setUp(self):
- KRPC.noisy = 0
self.af, self.a, self.ap = make(1180)
self.bf, self.b, self.bp = make(1181)
self.bp.stopListening()
def bufEquals(self, result, value):
- self.assertEqual(self.bf.buf, value)
+ self.failUnlessEqual(self.bf.buf, value)
def testSimpleMessage(self):
d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
def gotMsg(self, dict, should_be):
_krpc_sender = dict['_krpc_sender']
msg = dict['rsp']
- self.assertEqual(msg, should_be)
+ self.failUnlessEqual(msg, should_be)
def testManyEcho(self):
for i in xrange(100):
return df
def gotErr(self, err, should_be):
- self.assertEqual(err.value, should_be)
+ self.failUnlessEqual(err.value, should_be)
def testAddNode(self):
self.b = Node(khash.newID(), 'localhost', 2003)
self.t.insertNode(self.b)
- self.assertEqual(len(self.t.buckets[0].l), 1)
- self.assertEqual(self.t.buckets[0].l[0], self.b)
+ self.failUnlessEqual(len(self.t.buckets[0].l), 1)
+ self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
def testRemove(self):
self.testAddNode()
self.t.invalidateNode(self.b)
- self.assertEqual(len(self.t.buckets[0].l), 0)
+ self.failUnlessEqual(len(self.t.buckets[0].l), 0)
def testFail(self):
self.testAddNode()
for i in range(self.t.config['MAX_FAILURES'] - 1):
self.t.nodeFailed(self.b)
- self.assertEqual(len(self.t.buckets[0].l), 1)
- self.assertEqual(self.t.buckets[0].l[0], self.b)
+ self.failUnlessEqual(len(self.t.buckets[0].l), 1)
+ self.failUnlessEqual(self.t.buckets[0].l[0], self.b)
self.t.nodeFailed(self.b)
- self.assertEqual(len(self.t.buckets[0].l), 0)
+ self.failUnlessEqual(len(self.t.buckets[0].l), 0)
port = id['port']
id = id['id']
- assert(isinstance(id, str))
- assert(isinstance(host, str))
+ assert isinstance(id, str)
+ assert isinstance(host, str)
self.id = id
self.num = khash.intify(id)
self.host = host
def testUpdateLastSeen(self):
t = self.node.lastSeen
self.node.updateLastSeen()
- assert t < self.node.lastSeen
+ self.failUnless(t < self.node.lastSeen)
\ No newline at end of file