]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_p2p_Khashmir/krpc.py
WIP on sending multiple KRPC requests before timeout.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The KRPC communication protocol implementation.
5
6 @var KRPC_TIMEOUT: the number of seconds after which requests timeout
7 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
8     UDP packet without fragmentation
9
10 @var KRPC_ERROR: the code for a generic error
11 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
12 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
13 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
14 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
15 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
16 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
17
18 @var KRPC_ERROR_INTERNAL: the code for an internal error
19 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
20 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
21 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
22
23 @var TID: the identifier for the transaction ID
24 @var REQ: the identifier for a request packet
25 @var RSP: the identifier for a response packet
26 @var TYP: the identifier for the type of packet
27 @var ARG: the identifier for the argument to the request
28 @var ERR: the identifier for an error packet
29
30 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
31     KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
32     KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
33     KRPC_ERROR_RESPONSE_TOO_LONG
34 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
35     KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
36 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
37
38 """
39
40 from bencode import bencode, bdecode
41 from time import asctime
42 from math import ceil
43
44 from twisted.internet.defer import Deferred
45 from twisted.internet import protocol, reactor
46 from twisted.python import log
47 from twisted.trial import unittest
48
49 from khash import newID
50
51 KRPC_TIMEOUT = 20
52 UDP_PACKET_LIMIT = 1472
53
54 # Remote node errors
55 KRPC_ERROR = 200
56 KRPC_ERROR_SERVER_ERROR = 201
57 KRPC_ERROR_MALFORMED_PACKET = 202
58 KRPC_ERROR_METHOD_UNKNOWN = 203
59 KRPC_ERROR_MALFORMED_REQUEST = 204
60 KRPC_ERROR_INVALID_TOKEN = 205
61 KRPC_ERROR_RESPONSE_TOO_LONG = 206
62
63 # Local errors
64 KRPC_ERROR_INTERNAL = 100
65 KRPC_ERROR_RECEIVED_UNKNOWN = 101
66 KRPC_ERROR_TIMEOUT = 102
67 KRPC_ERROR_PROTOCOL_STOPPED = 103
68
69 # commands
70 TID = 't'
71 REQ = 'q'
72 RSP = 'r'
73 TYP = 'y'
74 ARG = 'a'
75 ERR = 'e'
76
77 class KrpcError(Exception):
78     """An error occurred in the KRPC protocol."""
79     pass
80
81 def verifyMessage(msg):
82     """Check received message for corruption and errors.
83     
84     @type msg: C{dictionary}
85     @param msg: the dictionary of information received on the connection
86     @raise KrpcError: if the message is corrupt
87     """
88     
89     if type(msg) != dict:
90         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
91     if TYP not in msg:
92         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
93     if msg[TYP] == REQ:
94         if REQ not in msg:
95             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
96         if type(msg[REQ]) != str:
97             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
98         if ARG not in msg:
99             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
100         if type(msg[ARG]) != dict:
101             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
102     elif msg[TYP] == RSP:
103         if RSP not in msg:
104             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
105         if type(msg[RSP]) != dict:
106             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
107     elif msg[TYP] == ERR:
108         if ERR not in msg:
109             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
110         if type(msg[ERR]) != list:
111             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
112         if len(msg[ERR]) != 2:
113             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
114         if type(msg[ERR][0]) not in (int, long):
115             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
116         if type(msg[ERR][1]) != str:
117             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
118 #    else:
119 #        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
120     if TID not in msg:
121         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
122     if type(msg[TID]) != str:
123         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
124
125 class hostbroker(protocol.DatagramProtocol):
126     """The factory for the KRPC protocol.
127     
128     @type server: L{khashmir.Khashmir}
129     @ivar server: the main Khashmir program
130     @type stats: L{stats.StatsLogger}
131     @ivar stats: the statistics logger to save transport info
132     @type config: C{dictionary}
133     @ivar config: the configuration parameters for the DHT
134     @type connections: C{dictionary}
135     @ivar connections: all the connections that have ever been made to the
136         protocol, keys are IP address and port pairs, values are L{KRPC}
137         protocols for the addresses
138     @ivar protocol: the protocol to use to handle incoming connections
139         (added externally)
140     @type addr: (C{string}, C{int})
141     @ivar addr: the IP address and port of this node
142     """
143     
144     def __init__(self, server, stats, config):
145         """Initialize the factory.
146         
147         @type server: L{khashmir.Khashmir}
148         @param server: the main DHT program
149         @type stats: L{stats.StatsLogger}
150         @param stats: the statistics logger to save transport info
151         @type config: C{dictionary}
152         @param config: the configuration parameters for the DHT
153         """
154         self.server = server
155         self.stats = stats
156         self.config = config
157         # this should be changed to storage that drops old entries
158         self.connections = {}
159         
160     def datagramReceived(self, datagram, addr):
161         """Optionally create a new protocol object, and handle the new datagram.
162         
163         @type datagram: C{string}
164         @param datagram: the data received from the transport.
165         @type addr: (C{string}, C{int})
166         @param addr: source IP address and port of datagram.
167         """
168         c = self.connectionForAddr(addr)
169         c.datagramReceived(datagram, addr)
170         #if c.idle():
171         #    del self.connections[addr]
172
173     def connectionForAddr(self, addr):
174         """Get a protocol object for the source.
175         
176         @type addr: (C{string}, C{int})
177         @param addr: source IP address and port of datagram.
178         """
179         # Don't connect to ourself
180         if addr == self.addr:
181             raise KrcpError
182         
183         # Create a new protocol object if necessary
184         if not self.connections.has_key(addr):
185             conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW'])
186             self.connections[addr] = conn
187         else:
188             conn = self.connections[addr]
189         return conn
190
191     def makeConnection(self, transport):
192         """Make a connection to a transport and save our address."""
193         protocol.DatagramProtocol.makeConnection(self, transport)
194         tup = transport.getHost()
195         self.addr = (tup.host, tup.port)
196         
197     def stopProtocol(self):
198         """Stop all the open connections."""
199         for conn in self.connections.values():
200             conn.stop()
201         protocol.DatagramProtocol.stopProtocol(self)
202
203 class KrpcRequest(Deferred):
204     """An outstanding request to a remote node.
205     
206     
207     """
208     
209     def __init__(self, protocol, newTID, method, data, initDelay):
210         Deferred.__init__(self)
211         self.protocol = protocol
212         self.tid = newTID
213         self.method = method
214         self.data = data
215         self.delay = initDelay
216         self.later = None
217         
218     def send(self):
219         self.later = reactor.callLater(self.delay, self.timeOut)
220         self.delay *= 2
221         self.protocol._sendData(self.method, self.data)
222
223     def timeOut(self):
224         """Call the deferred's errback if a timeout occurs."""
225         self.protocol._timeOut(self.tid, self.method)
226         
227     def callback(self, resp):
228         self.dropTimeOut()
229         Deferred.callback(self, resp)
230         
231     def errback(self, resp):
232         self.dropTimeOut()
233         Deferred.errback(self, resp)
234         
235     def dropTimeOut(self):
236         """Cancel the timeout call when a response is received."""
237         if self.later and self.later.active():
238             self.later.cancel()
239
240 class KRPC:
241     """The KRPC protocol implementation.
242     
243     @ivar transport: the transport to use for the protocol
244     @type factory: L{khashmir.Khashmir}
245     @ivar factory: the main Khashmir program
246     @type stats: L{stats.StatsLogger}
247     @ivar stats: the statistics logger to save transport info
248     @type addr: (C{string}, C{int})
249     @ivar addr: the IP address and port of the source node
250     @type noisy: C{boolean}
251     @ivar noisy: whether to log additional details of the protocol
252     @type tids: C{dictionary}
253     @ivar tids: the transaction IDs outstanding for requests, keys are the
254         transaction ID of the request, values are the deferreds to call with
255         the results
256     @type stopped: C{boolean}
257     @ivar stopped: whether the protocol has been stopped
258     """
259     
260     def __init__(self, addr, server, stats, transport, spew = False):
261         """Initialize the protocol.
262         
263         @type addr: (C{string}, C{int})
264         @param addr: the IP address and port of the source node
265         @type server: L{khashmir.Khashmir}
266         @param server: the main Khashmir program
267         @type stats: L{stats.StatsLogger}
268         @param stats: the statistics logger to save transport info
269         @param transport: the transport to use for the protocol
270         @type spew: C{boolean}
271         @param spew: whether to log additional details of the protocol
272             (optional, defaults to False)
273         """
274         self.transport = transport
275         self.factory = server
276         self.stats = stats
277         self.addr = addr
278         self.noisy = spew
279         self.tids = {}
280         self.stopped = False
281
282     def datagramReceived(self, data, addr):
283         """Process the new datagram.
284         
285         @type data: C{string}
286         @param data: the data received from the transport.
287         @type addr: (C{string}, C{int})
288         @param addr: source IP address and port of datagram.
289         """
290         self.stats.receivedBytes(len(data))
291         if self.stopped:
292             if self.noisy:
293                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
294
295         # Bdecode the message
296         try:
297             msg = bdecode(data)
298         except Exception, e:
299             if self.noisy:
300                 log.msg("krpc bdecode error: ")
301                 log.err(e)
302             return
303
304         # Make sure the remote node isn't trying anything funny
305         try:
306             verifyMessage(msg)
307         except Exception, e:
308             log.msg("krpc message verification error: ")
309             log.err(e)
310             return
311
312         if self.noisy:
313             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
314
315         # Process it based on its type
316         if msg[TYP]  == REQ:
317             ilen = len(data)
318             
319             # Requests are handled by the factory
320             f = getattr(self.factory ,"krpc_" + msg[REQ], None)
321             msg[ARG]['_krpc_sender'] =  self.addr
322             if f and callable(f):
323                 self.stats.receivedAction(msg[REQ])
324                 try:
325                     ret = f(*(), **msg[ARG])
326                 except KrpcError, e:
327                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
328                     log.err(e)
329                     self.stats.errorAction(msg[REQ])
330                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
331                                               [e[0], e[1]])
332                 except TypeError, e:
333                     log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
334                     log.err(e)
335                     self.stats.errorAction(msg[REQ])
336                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
337                                               [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
338                 except Exception, e:
339                     log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
340                     log.err(e)
341                     self.stats.errorAction(msg[REQ])
342                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
343                                               [KRPC_ERROR_SERVER_ERROR, str(e)])
344                 else:
345                     olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
346             else:
347                 # Request for unknown method
348                 log.msg("ERROR: don't know about method %s" % msg[REQ])
349                 self.stats.receivedAction('unknown')
350                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
351                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
352             if self.noisy:
353                 log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
354                                                   ilen, msg[REQ], olen))
355         elif msg[TYP] == RSP:
356             # Responses get processed by their TID's deferred
357             if self.tids.has_key(msg[TID]):
358                 req = self.tids[msg[TID]]
359                 #       callback
360                 del(self.tids[msg[TID]])
361                 msg[RSP]['_krpc_sender'] = addr
362                 req.callback(msg[RSP])
363             else:
364                 # no tid, this transaction timed out already...
365                 if self.noisy:
366                     log.msg('timeout: %r' % msg[RSP]['id'])
367         elif msg[TYP] == ERR:
368             # Errors get processed by their TID's deferred's errback
369             if self.tids.has_key(msg[TID]):
370                 req = self.tids[msg[TID]]
371                 del(self.tids[msg[TID]])
372                 # callback
373                 req.errback(KrpcError(*msg[ERR]))
374             else:
375                 # day late and dollar short, just log it
376                 log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
377                 pass
378         else:
379             # Received an unknown message type
380             if self.noisy:
381                 log.msg("unknown message type: %r" % msg)
382             if msg[TID] in self.tids:
383                 req = self.tids[msg[TID]]
384                 del(self.tids[msg[TID]])
385                 # callback
386                 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
387                                       "Received an unknown message type: %r" % msg[TYP]))
388                 
389     def _sendResponse(self, request, addr, tid, msgType, response):
390         """Helper function for sending responses to nodes.
391
392         @param request: the name of the requested method
393         @type addr: (C{string}, C{int})
394         @param addr: source IP address and port of datagram.
395         @param tid: the transaction ID of the request
396         @param msgType: the type of message to respond with
397         @param response: the arguments for the response
398         """
399         if not response:
400             response = {}
401         
402         try:
403             # Create the response message
404             msg = {TID : tid, TYP : msgType, msgType : response}
405     
406             if self.noisy:
407                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
408     
409             out = bencode(msg)
410             
411             # Make sure its not too long
412             if len(out) > UDP_PACKET_LIMIT:
413                 # Can we remove some values to shorten it?
414                 if 'values' in response:
415                     # Save the original list of values
416                     orig_values = response['values']
417                     len_orig_values = len(bencode(orig_values))
418                     
419                     # Caclulate the maximum value length possible
420                     max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
421                     assert max_len_values > 0
422                     
423                     # Start with a calculation of how many values should be included
424                     # (assumes all values are the same length)
425                     per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
426                     num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
427     
428                     # Do a linear search for the actual maximum number possible
429                     bencoded_values = len(bencode(orig_values[:num_values]))
430                     while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
431                         bencoded_values += len(bencode(orig_values[num_values]))
432                         num_values += 1
433                     while bencoded_values > max_len_values and num_values > 0:
434                         num_values -= 1
435                         bencoded_values -= len(bencode(orig_values[num_values]))
436                     assert num_values > 0
437     
438                     # Encode the result
439                     response['values'] = orig_values[:num_values]
440                     out = bencode(msg)
441                     assert len(out) < UDP_PACKET_LIMIT
442                     log.msg('Shortened a long packet from %d to %d values, new packet length: %d' % 
443                             (len(orig_values), num_values, len(out)))
444                 else:
445                     # Too long a response, send an error
446                     log.msg('Could not send response, too long: %d bytes' % len(out))
447                     self.stats.errorAction(request)
448                     msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
449                     out = bencode(msg)
450
451         except Exception, e:
452             # Unknown error, send an error message
453             self.stats.errorAction(request)
454             msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
455             out = bencode(msg)
456                     
457         self.stats.sentBytes(len(out))
458         self.transport.write(out, addr)
459         return len(out)
460     
461     def sendRequest(self, method, args):
462         """Send a request to the remote node.
463         
464         @type method: C{string}
465         @param method: the method name to call on the remote node
466         @param args: the arguments to send to the remote node's method
467         """
468         if self.stopped:
469             raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
470
471         # Create the request message
472         newTID = newID()
473         msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
474         if self.noisy:
475             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
476         data = bencode(msg)
477         
478         # Create the deferred and save it with the TID
479         req = KrpcRequest(self, newTID, method, data, 10)
480         self.tids[newTID] = req
481         
482         # Save the conclusion of the action
483         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
484                          callbackArgs = (method, ), errbackArgs = (method, ))
485
486         req.send()
487         return req
488     
489     def _sendData(self, method, data):
490         # Save some stats
491         self.stats.sentAction(method)
492         self.stats.sentBytes(len(data))
493         
494         self.transport.write(data, self.addr)
495         
496     def _timeOut(self, badTID, method):
497         """Call the deferred's errback if a timeout occurs."""
498         if badTID in self.tids:
499             req = self.tids[badTID]
500             del(self.tids[badTID])
501             self.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % 
502                                                         (method, self.addr)))
503         
504     def stop(self):
505         """Timeout all pending requests."""
506         for req in self.tids.values():
507             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
508                                   'connection has been stopped while waiting for response'))
509         self.tids = {}
510         self.stopped = True
511
512 #{ For testing the KRPC protocol
513 def connectionForAddr(host, port):
514     return host
515     
516 class Receiver(protocol.Factory):
517     protocol = KRPC
518     def __init__(self):
519         self.buf = []
520     def krpc_store(self, msg, _krpc_sender):
521         self.buf += [msg]
522         return {}
523     def krpc_echo(self, msg, _krpc_sender):
524         return {'msg': msg}
525     def krpc_values(self, length, num, _krpc_sender):
526         return {'values': ['1'*length]*num}
527
528 def make(port):
529     from stats import StatsLogger
530     af = Receiver()
531     a = hostbroker(af, StatsLogger(None, None, {}), {'SPEW': False})
532     a.protocol = KRPC
533     p = reactor.listenUDP(port, a)
534     return af, a, p
535     
536 class KRPCTests(unittest.TestCase):
537     timeout = 2
538     
539     def setUp(self):
540         self.af, self.a, self.ap = make(1180)
541         self.bf, self.b, self.bp = make(1181)
542
543     def tearDown(self):
544         self.ap.stopListening()
545         self.bp.stopListening()
546
547     def bufEquals(self, result, value):
548         self.failUnlessEqual(self.bf.buf, value)
549
550     def testSimpleMessage(self):
551         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
552         d.addCallback(self.bufEquals, ["This is a test."])
553         return d
554
555     def testMessageBlast(self):
556         for i in range(100):
557             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
558         d.addCallback(self.bufEquals, ["This is a test."] * 100)
559         return d
560
561     def testEcho(self):
562         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
563         df.addCallback(self.gotMsg, "This is a test.")
564         return df
565
566     def gotMsg(self, dict, should_be):
567         _krpc_sender = dict['_krpc_sender']
568         self.failUnlessEqual(dict['msg'], should_be)
569
570     def testManyEcho(self):
571         for i in xrange(100):
572             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
573             df.addCallback(self.gotMsg, "This is a test.")
574         return df
575
576     def testMultiEcho(self):
577         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
578         df.addCallback(self.gotMsg, "This is a test.")
579
580         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
581         df.addCallback(self.gotMsg, "This is another test.")
582
583         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
584         df.addCallback(self.gotMsg, "This is yet another test.")
585         
586         return df
587
588     def testEchoReset(self):
589         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
590         df.addCallback(self.gotMsg, "This is a test.")
591
592         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
593         df.addCallback(self.gotMsg, "This is another test.")
594         df.addCallback(self.echoReset)
595         return df
596     
597     def echoReset(self, dict):
598         del(self.a.connections[('127.0.0.1', 1181)])
599         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
600         df.addCallback(self.gotMsg, "This is yet another test.")
601         return df
602
603     def testUnknownMeth(self):
604         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
605         df = self.failUnlessFailure(df, KrpcError)
606         df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
607         return df
608
609     def testMalformedRequest(self):
610         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
611         df = self.failUnlessFailure(df, KrpcError)
612         df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
613         return df
614
615     def gotErr(self, value, should_be, *errorTypes):
616         self.failUnlessEqual(value[0], should_be)
617         if errorTypes:
618             self.flushLoggedErrors(*errorTypes)
619         
620     def testLongPackets(self):
621         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
622         df.addCallback(self.gotLongRsp)
623         return df
624
625     def gotLongRsp(self, dict):
626         # Not quite accurate, but good enough
627         self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
628