2 """The KRPC communication protocol implementation.
4 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
5 UDP packet without fragmentation
7 @var KRPC_ERROR: the code for a generic error
8 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
9 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
10 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
11 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
12 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
13 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
15 @var KRPC_ERROR_INTERNAL: the code for an internal error
16 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
17 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
18 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
20 @var TID: the identifier for the transaction ID
21 @var REQ: the identifier for a request packet
22 @var RSP: the identifier for a response packet
23 @var TYP: the identifier for the type of packet
24 @var ARG: the identifier for the argument to the request
25 @var ERR: the identifier for an error packet
27 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
28 KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
29 KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
30 KRPC_ERROR_RESPONSE_TOO_LONG
31 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
32 KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
33 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
37 from bencode import bencode, bdecode
38 from datetime import datetime, timedelta
41 from twisted.internet import defer
42 from twisted.internet import protocol, reactor
43 from twisted.python import log
44 from twisted.trial import unittest
46 from khash import newID
48 UDP_PACKET_LIMIT = 1472
52 KRPC_ERROR_SERVER_ERROR = 201
53 KRPC_ERROR_MALFORMED_PACKET = 202
54 KRPC_ERROR_METHOD_UNKNOWN = 203
55 KRPC_ERROR_MALFORMED_REQUEST = 204
56 KRPC_ERROR_INVALID_TOKEN = 205
57 KRPC_ERROR_RESPONSE_TOO_LONG = 206
60 KRPC_ERROR_INTERNAL = 100
61 KRPC_ERROR_RECEIVED_UNKNOWN = 101
62 KRPC_ERROR_TIMEOUT = 102
63 KRPC_ERROR_PROTOCOL_STOPPED = 103
73 class KrpcError(Exception):
74 """An error occurred in the KRPC protocol."""
77 def verifyMessage(msg):
78 """Check received message for corruption and errors.
80 @type msg: C{dictionary}
81 @param msg: the dictionary of information received on the connection
82 @raise KrpcError: if the message is corrupt
86 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
88 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
91 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
92 if type(msg[REQ]) != str:
93 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
95 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
96 if type(msg[ARG]) != dict:
97 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
100 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
101 if type(msg[RSP]) != dict:
102 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
103 # if 'nodes' in msg[RSP] and type(msg[RSP]['nodes']) != list:
104 # raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "wrong type of node, this is not bittorrent")
105 elif msg[TYP] == ERR:
107 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
108 if type(msg[ERR]) != list:
109 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
110 if len(msg[ERR]) != 2:
111 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
112 if type(msg[ERR][0]) not in (int, long):
113 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
114 if type(msg[ERR][1]) != str:
115 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
117 # raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
119 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
120 if type(msg[TID]) != str:
121 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
122 if len(msg[TID]) != 20:
123 raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "wrong type of node, this is not bittorrent")
125 class hostbroker(protocol.DatagramProtocol):
126 """The factory for the KRPC protocol.
128 @type server: L{khashmir.Khashmir}
129 @ivar server: the main Khashmir program
130 @type stats: L{stats.StatsLogger}
131 @ivar stats: the statistics logger to save transport info
132 @type config: C{dictionary}
133 @ivar config: the configuration parameters for the DHT
134 @type connections: C{dictionary}
135 @ivar connections: all the connections that have ever been made to the
136 protocol, keys are IP address and port pairs, values are L{KRPC}
137 protocols for the addresses
138 @ivar protocol: the protocol to use to handle incoming connections
140 @type addr: (C{string}, C{int})
141 @ivar addr: the IP address and port of this node
144 def __init__(self, server, stats, config):
145 """Initialize the factory.
147 @type server: L{khashmir.Khashmir}
148 @param server: the main DHT program
149 @type stats: L{stats.StatsLogger}
150 @param stats: the statistics logger to save transport info
151 @type config: C{dictionary}
152 @param config: the configuration parameters for the DHT
157 # this should be changed to storage that drops old entries
158 self.connections = {}
160 def datagramReceived(self, datagram, addr):
161 """Optionally create a new protocol object, and handle the new datagram.
163 @type datagram: C{string}
164 @param datagram: the data received from the transport.
165 @type addr: (C{string}, C{int})
166 @param addr: source IP address and port of datagram.
168 c = self.connectionForAddr(addr)
169 c.datagramReceived(datagram, addr)
171 # del self.connections[addr]
173 def connectionForAddr(self, addr):
174 """Get a protocol object for the source.
176 @type addr: (C{string}, C{int})
177 @param addr: source IP address and port of datagram.
179 # Don't connect to ourself
180 if addr == self.addr:
183 # Create a new protocol object if necessary
184 if not self.connections.has_key(addr):
185 conn = self.protocol(addr, self.server, self.stats, self.transport, self.config)
186 self.connections[addr] = conn
188 conn = self.connections[addr]
191 def makeConnection(self, transport):
192 """Make a connection to a transport and save our address."""
193 protocol.DatagramProtocol.makeConnection(self, transport)
194 tup = transport.getHost()
195 self.addr = (tup.host, tup.port)
197 def stopProtocol(self):
198 """Stop all the open connections."""
199 for conn in self.connections.values():
201 protocol.DatagramProtocol.stopProtocol(self)
203 class KrpcRequest(defer.Deferred):
204 """An outstanding request to a remote node.
206 @type protocol: L{KRPC}
207 @ivar protocol: the protocol to send data with
208 @ivar tid: the transaction ID of the request
209 @type method: C{string}
210 @ivar method: the name of the method to call on the remote node
211 @type data: C{string}
212 @ivar data: the message to send to the remote node
213 @type config: C{dictionary}
214 @ivar config: the configuration parameters for the DHT
216 @ivar delay: the last timeout delay sent
217 @type start: C{datetime}
218 @ivar start: the time to request was started at
219 @type laterNextTimeout: L{twisted.internet.interfaces.IDelayedCall}
220 @ivar laterNextTimeout: the pending call to timeout the last sent request
221 @type laterFinalTimeout: L{twisted.internet.interfaces.IDelayedCall}
222 @ivar laterFinalTimeout: the pending call to timeout the entire request
225 def __init__(self, protocol, newTID, method, data, config):
226 """Initialize the request, and send it out.
228 @type protocol: L{KRPC}
229 @param protocol: the protocol to send data with
230 @param newTID: the transaction ID of the request
231 @type method: C{string}
232 @param method: the name of the method to call on the remote node
233 @type data: C{string}
234 @param data: the message to send to the remote node
235 @type config: C{dictionary}
236 @param config: the configuration parameters for the DHT
238 defer.Deferred.__init__(self)
239 self.protocol = protocol
244 self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
245 self.start = datetime.now()
246 self.laterNextTimeout = None
247 self.laterFinalTimeout = reactor.callLater(self.config.get('KRPC_TIMEOUT', 9), self.finalTimeout)
248 reactor.callLater(0, self.send)
251 """Send the request to the remote node."""
252 assert not self.laterNextTimeout, 'There is already a pending request'
253 self.laterNextTimeout = reactor.callLater(self.delay, self.nextTimeout)
255 self.protocol.sendData(self.method, self.data)
259 def nextTimeout(self):
260 """Check for a unrecoverable timeout, otherwise resend."""
261 self.laterNextTimeout = None
262 if datetime.now() - self.start > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 9)):
264 elif self.protocol.stopped:
265 log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
268 log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
269 reactor.callLater(0, self.send)
271 def finalTimeout(self):
272 """Timeout the request after an unrecoverable timeout."""
274 delay = datetime.now() - self.start
275 log.msg('%r timed out after %0.2f sec' %
276 (self.tid, delay.seconds + delay.microseconds/1000000.0))
277 self.protocol.timeOut(self.tid, self.method)
279 def callback(self, resp):
281 defer.Deferred.callback(self, resp)
283 def errback(self, resp):
285 defer.Deferred.errback(self, resp)
287 def dropTimeOut(self):
288 """Cancel the timeout call when a response is received."""
289 if self.laterFinalTimeout and self.laterFinalTimeout.active():
290 self.laterFinalTimeout.cancel()
291 self.laterFinalTimeout = None
292 if self.laterNextTimeout and self.laterNextTimeout.active():
293 self.laterNextTimeout.cancel()
294 self.laterNextTimeout = None
297 """The KRPC protocol implementation.
299 @ivar transport: the transport to use for the protocol
300 @type factory: L{khashmir.Khashmir}
301 @ivar factory: the main Khashmir program
302 @type stats: L{stats.StatsLogger}
303 @ivar stats: the statistics logger to save transport info
304 @type addr: (C{string}, C{int})
305 @ivar addr: the IP address and port of the source node
306 @type config: C{dictionary}
307 @ivar config: the configuration parameters for the DHT
308 @type tids: C{dictionary}
309 @ivar tids: the transaction IDs outstanding for requests, keys are the
310 transaction ID of the request, values are the deferreds to call with
312 @type stopped: C{boolean}
313 @ivar stopped: whether the protocol has been stopped
316 def __init__(self, addr, server, stats, transport, config = {}):
317 """Initialize the protocol.
319 @type addr: (C{string}, C{int})
320 @param addr: the IP address and port of the source node
321 @type server: L{khashmir.Khashmir}
322 @param server: the main Khashmir program
323 @type stats: L{stats.StatsLogger}
324 @param stats: the statistics logger to save transport info
325 @param transport: the transport to use for the protocol
326 @type config: C{dictionary}
327 @param config: the configuration parameters for the DHT
328 (optional, defaults to using defaults)
330 self.transport = transport
331 self.factory = server
338 def datagramReceived(self, data, addr):
339 """Process the new datagram.
341 @type data: C{string}
342 @param data: the data received from the transport.
343 @type addr: (C{string}, C{int})
344 @param addr: source IP address and port of datagram.
346 self.stats.receivedBytes(len(data))
348 if self.config.get('SPEW', False):
349 log.msg("stopped, dropping message from %r: %s" % (addr, data))
351 # Bdecode the message
355 if self.config.get('SPEW', False):
356 log.msg("krpc bdecode error from %r: " % (addr, ))
360 # Make sure the remote node isn't trying anything funny
364 log.msg("krpc message verification error from %r: %r" % (addr, e))
367 if self.config.get('SPEW', False):
368 log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
370 # Process it based on its type
374 # Requests are handled by the factory
375 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
376 msg[ARG]['_krpc_sender'] = self.addr
377 if f and callable(f):
378 self.stats.receivedAction(msg[REQ])
380 ret = f(*(), **msg[ARG])
382 log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
383 if e[0] != KRPC_ERROR_INVALID_TOKEN:
386 log.msg('Node sent us an invalid token, not storing')
387 self.stats.errorAction(msg[REQ])
388 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
391 log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
393 self.stats.errorAction(msg[REQ])
394 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
395 [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
397 log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
399 self.stats.errorAction(msg[REQ])
400 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
401 [KRPC_ERROR_SERVER_ERROR, str(e)])
403 olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
405 # Request for unknown method
406 log.msg("ERROR: don't know about method %s" % msg[REQ])
407 self.stats.receivedAction('unknown')
408 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
409 [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
411 log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen))
412 elif msg[TYP] == RSP:
413 # Responses get processed by their TID's deferred
414 if self.tids.has_key(msg[TID]):
415 req = self.tids[msg[TID]]
417 del(self.tids[msg[TID]])
418 msg[RSP]['_krpc_sender'] = addr
419 req.callback(msg[RSP])
421 # no tid, this transaction was finished already...
422 if self.config.get('SPEW', False):
423 log.msg('received response from %r for completed request: %r' %
424 (msg[RSP]['id'], msg[TID]))
425 elif msg[TYP] == ERR:
426 # Errors get processed by their TID's deferred's errback
427 if self.tids.has_key(msg[TID]):
428 req = self.tids[msg[TID]]
429 del(self.tids[msg[TID]])
431 req.errback(KrpcError(*msg[ERR]))
433 # no tid, this transaction was finished already...
434 log.msg('received an error %r from %r for completed request: %r' %
435 (msg[ERR], addr, msg[TID]))
437 # Received an unknown message type
438 if self.config.get('SPEW', False):
439 log.msg("unknown message type: %r" % msg)
440 if msg[TID] in self.tids:
441 req = self.tids[msg[TID]]
442 del(self.tids[msg[TID]])
444 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
445 "Received an unknown message type: %r" % msg[TYP]))
447 def _sendResponse(self, request, addr, tid, msgType, response):
448 """Helper function for sending responses to nodes.
450 @param request: the name of the requested method
451 @type addr: (C{string}, C{int})
452 @param addr: source IP address and port of datagram.
453 @param tid: the transaction ID of the request
454 @param msgType: the type of message to respond with
455 @param response: the arguments for the response
461 # Create the response message
462 msg = {TID : tid, TYP : msgType, msgType : response}
464 if self.config.get('SPEW', False):
465 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
469 # Make sure its not too long
470 if len(out) > UDP_PACKET_LIMIT:
471 # Can we remove some values to shorten it?
472 if 'values' in response:
473 # Save the original list of values
474 orig_values = response['values']
475 len_orig_values = len(bencode(orig_values))
477 # Caclulate the maximum value length possible
478 max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
479 assert max_len_values > 0
481 # Start with a calculation of how many values should be included
482 # (assumes all values are the same length)
483 per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
484 num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
486 # Do a linear search for the actual maximum number possible
487 bencoded_values = len(bencode(orig_values[:num_values]))
488 while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
489 bencoded_values += len(bencode(orig_values[num_values]))
491 while bencoded_values > max_len_values and num_values > 0:
493 bencoded_values -= len(bencode(orig_values[num_values]))
494 assert num_values > 0
497 response['values'] = orig_values[:num_values]
499 assert len(out) < UDP_PACKET_LIMIT
500 log.msg('Shortened a long packet from %d to %d values, new packet length: %d' %
501 (len(orig_values), num_values, len(out)))
503 # Too long a response, send an error
504 log.msg('Could not send response, too long: %d bytes' % len(out))
505 self.stats.errorAction(request)
506 msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
510 # Unknown error, send an error message
511 self.stats.errorAction(request)
512 msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
515 self.stats.sentBytes(len(out))
516 self.transport.write(out, addr)
519 def sendRequest(self, method, args):
520 """Send a request to the remote node.
522 @type method: C{string}
523 @param method: the method name to call on the remote node
524 @param args: the arguments to send to the remote node's method
527 return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
528 "cannot send, connection has been stopped"))
530 # Create the request message
532 msg = {TID : newTID, TYP : REQ, REQ : method, ARG : args}
533 if self.config.get('SPEW', False):
534 log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
537 # Create the request object and save it with the TID
538 req = KrpcRequest(self, newTID, method, data, self.config)
539 self.tids[newTID] = req
541 # Save the conclusion of the action
542 req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
543 callbackArgs = (method, datetime.now()),
544 errbackArgs = (method, datetime.now()))
548 def sendData(self, method, data):
549 """Write a request to the transport and save the stats.
551 @type method: C{string}
552 @param method: the name of the method to call on the remote node
553 @type data: C{string}
554 @param data: the message to send to the remote node
556 self.transport.write(data, self.addr)
557 self.stats.sentAction(method)
558 self.stats.sentBytes(len(data))
560 def timeOut(self, badTID, method):
561 """Call the deferred's errback if a timeout occurs.
563 @param badTID: the transaction ID of the request
564 @type method: C{string}
565 @param method: the name of the method that timed out on the remote node
567 if badTID in self.tids:
568 req = self.tids[badTID]
569 del(self.tids[badTID])
570 req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" %
571 (method, self.addr)))
573 log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
576 """Cancel all pending requests."""
577 for req in self.tids.values():
578 req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
579 'connection has been stopped while waiting for response'))
583 #{ For testing the KRPC protocol
584 def connectionForAddr(host, port):
587 class Receiver(protocol.Factory):
591 def krpc_store(self, msg, _krpc_sender):
594 def krpc_echo(self, msg, _krpc_sender):
596 def krpc_values(self, length, num, _krpc_sender):
597 return {'values': ['1'*length]*num}
600 from stats import StatsLogger
602 a = hostbroker(af, StatsLogger(None, None),
603 {'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
605 p = reactor.listenUDP(port, a)
608 class KRPCTests(unittest.TestCase):
612 self.af, self.a, self.ap = make(1180)
613 self.bf, self.b, self.bp = make(1181)
616 self.ap.stopListening()
617 self.bp.stopListening()
619 def bufEquals(self, result, value):
620 self.failUnlessEqual(self.bf.buf, value)
622 def testSimpleMessage(self):
623 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
624 d.addCallback(self.bufEquals, ["This is a test."])
627 def testMessageBlast(self):
629 d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
630 d.addCallback(self.bufEquals, ["This is a test."] * 100)
634 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
635 df.addCallback(self.gotMsg, "This is a test.")
638 def gotMsg(self, dict, should_be):
639 _krpc_sender = dict['_krpc_sender']
640 self.failUnlessEqual(dict['msg'], should_be)
642 def testManyEcho(self):
643 for i in xrange(100):
644 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
645 df.addCallback(self.gotMsg, "This is a test.")
648 def testMultiEcho(self):
649 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
650 df.addCallback(self.gotMsg, "This is a test.")
652 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
653 df.addCallback(self.gotMsg, "This is another test.")
655 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
656 df.addCallback(self.gotMsg, "This is yet another test.")
660 def testEchoReset(self):
661 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
662 df.addCallback(self.gotMsg, "This is a test.")
664 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
665 df.addCallback(self.gotMsg, "This is another test.")
666 df.addCallback(self.echoReset)
669 def echoReset(self, dict):
670 del(self.a.connections[('127.0.0.1', 1181)])
671 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
672 df.addCallback(self.gotMsg, "This is yet another test.")
675 def testUnknownMeth(self):
676 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
677 df = self.failUnlessFailure(df, KrpcError)
678 df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
681 def testMalformedRequest(self):
682 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
683 df = self.failUnlessFailure(df, KrpcError)
684 df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
687 def gotErr(self, value, should_be, *errorTypes):
688 self.failUnlessEqual(value[0], should_be)
690 self.flushLoggedErrors(*errorTypes)
692 def testLongPackets(self):
693 df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
694 df.addCallback(self.gotLongRsp)
697 def gotLongRsp(self, dict):
698 # Not quite accurate, but good enough
699 self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)