Krpc now works as before but with a new request infrastructure.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The KRPC communication protocol implementation.
5
6 @var KRPC_TIMEOUT: the number of seconds after which requests timeout
7 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
8     UDP packet without fragmentation
9
10 @var KRPC_ERROR: the code for a generic error
11 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
12 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
13 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
14 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
15 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
16 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
17
18 @var KRPC_ERROR_INTERNAL: the code for an internal error
19 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
20 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
21 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
22
23 @var TID: the identifier for the transaction ID
24 @var REQ: the identifier for a request packet
25 @var RSP: the identifier for a response packet
26 @var TYP: the identifier for the type of packet
27 @var ARG: the identifier for the argument to the request
28 @var ERR: the identifier for an error packet
29
30 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
31     KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
32     KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
33     KRPC_ERROR_RESPONSE_TOO_LONG
34 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
35     KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
36 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
37
38 """
39
40 from bencode import bencode, bdecode
41 from time import asctime
42 from math import ceil
43
44 from twisted.internet.defer import Deferred
45 from twisted.internet import protocol, reactor
46 from twisted.python import log
47 from twisted.trial import unittest
48
49 from khash import newID
50
51 KRPC_TIMEOUT = 20
52 UDP_PACKET_LIMIT = 1472
53
54 # Remote node errors
55 KRPC_ERROR = 200
56 KRPC_ERROR_SERVER_ERROR = 201
57 KRPC_ERROR_MALFORMED_PACKET = 202
58 KRPC_ERROR_METHOD_UNKNOWN = 203
59 KRPC_ERROR_MALFORMED_REQUEST = 204
60 KRPC_ERROR_INVALID_TOKEN = 205
61 KRPC_ERROR_RESPONSE_TOO_LONG = 206
62
63 # Local errors
64 KRPC_ERROR_INTERNAL = 100
65 KRPC_ERROR_RECEIVED_UNKNOWN = 101
66 KRPC_ERROR_TIMEOUT = 102
67 KRPC_ERROR_PROTOCOL_STOPPED = 103
68
69 # commands
70 TID = 't'
71 REQ = 'q'
72 RSP = 'r'
73 TYP = 'y'
74 ARG = 'a'
75 ERR = 'e'
76
77 class KrpcError(Exception):
78     """An error occurred in the KRPC protocol."""
79     pass
80
81 def verifyMessage(msg):
82     """Check received message for corruption and errors.
83     
84     @type msg: C{dictionary}
85     @param msg: the dictionary of information received on the connection
86     @raise KrpcError: if the message is corrupt
87     """
88     
89     if type(msg) != dict:
90         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
91     if TYP not in msg:
92         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
93     if msg[TYP] == REQ:
94         if REQ not in msg:
95             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
96         if type(msg[REQ]) != str:
97             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
98         if ARG not in msg:
99             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
100         if type(msg[ARG]) != dict:
101             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
102     elif msg[TYP] == RSP:
103         if RSP not in msg:
104             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
105         if type(msg[RSP]) != dict:
106             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
107     elif msg[TYP] == ERR:
108         if ERR not in msg:
109             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
110         if type(msg[ERR]) != list:
111             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
112         if len(msg[ERR]) != 2:
113             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
114         if type(msg[ERR][0]) not in (int, long):
115             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
116         if type(msg[ERR][1]) != str:
117             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
118 #    else:
119 #        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
120     if TID not in msg:
121         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
122     if type(msg[TID]) != str:
123         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
124
125 class hostbroker(protocol.DatagramProtocol):
126     """The factory for the KRPC protocol.
127     
128     @type server: L{khashmir.Khashmir}
129     @ivar server: the main Khashmir program
130     @type stats: L{stats.StatsLogger}
131     @ivar stats: the statistics logger to save transport info
132     @type config: C{dictionary}
133     @ivar config: the configuration parameters for the DHT
134     @type connections: C{dictionary}
135     @ivar connections: all the connections that have ever been made to the
136         protocol, keys are IP address and port pairs, values are L{KRPC}
137         protocols for the addresses
138     @ivar protocol: the protocol to use to handle incoming connections
139         (added externally)
140     @type addr: (C{string}, C{int})
141     @ivar addr: the IP address and port of this node
142     """
143     
144     def __init__(self, server, stats, config):
145         """Initialize the factory.
146         
147         @type server: L{khashmir.Khashmir}
148         @param server: the main DHT program
149         @type stats: L{stats.StatsLogger}
150         @param stats: the statistics logger to save transport info
151         @type config: C{dictionary}
152         @param config: the configuration parameters for the DHT
153         """
154         self.server = server
155         self.stats = stats
156         self.config = config
157         # this should be changed to storage that drops old entries
158         self.connections = {}
159         
160     def datagramReceived(self, datagram, addr):
161         """Optionally create a new protocol object, and handle the new datagram.
162         
163         @type datagram: C{string}
164         @param datagram: the data received from the transport.
165         @type addr: (C{string}, C{int})
166         @param addr: source IP address and port of datagram.
167         """
168         c = self.connectionForAddr(addr)
169         c.datagramReceived(datagram, addr)
170         #if c.idle():
171         #    del self.connections[addr]
172
173     def connectionForAddr(self, addr):
174         """Get a protocol object for the source.
175         
176         @type addr: (C{string}, C{int})
177         @param addr: source IP address and port of datagram.
178         """
179         # Don't connect to ourself
180         if addr == self.addr:
181             raise KrcpError
182         
183         # Create a new protocol object if necessary
184         if not self.connections.has_key(addr):
185             conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW'])
186             self.connections[addr] = conn
187         else:
188             conn = self.connections[addr]
189         return conn
190
191     def makeConnection(self, transport):
192         """Make a connection to a transport and save our address."""
193         protocol.DatagramProtocol.makeConnection(self, transport)
194         tup = transport.getHost()
195         self.addr = (tup.host, tup.port)
196         
197     def stopProtocol(self):
198         """Stop all the open connections."""
199         for conn in self.connections.values():
200             conn.stop()
201         protocol.DatagramProtocol.stopProtocol(self)
202
203 class KrpcRequest(Deferred):
204     """An outstanding request to a remote node.
205     
206     
207     """
208     
209     def __init__(self, protocol, newTID, method, data, initDelay):
210         Deferred.__init__(self)
211         self.protocol = protocol
212         self.tid = newTID
213         self.method = method
214         self.data = data
215         self.delay = initDelay
216         self.later = None
217         
218     def send(self):
219         assert not self.later, 'There is already a pending request'
220         self.later = reactor.callLater(self.delay, self.timeOut)
221         self.delay *= 2
222         self.protocol.sendData(self.method, self.data)
223
224     def timeOut(self):
225         """Call the deferred's errback if a timeout occurs."""
226         self.later = None
227         self.protocol.timeOut(self.tid, self.method)
228         
229     def callback(self, resp):
230         self.dropTimeOut()
231         Deferred.callback(self, resp)
232         
233     def errback(self, resp):
234         self.dropTimeOut()
235         Deferred.errback(self, resp)
236         
237     def dropTimeOut(self):
238         """Cancel the timeout call when a response is received."""
239         if self.later and self.later.active():
240             self.later.cancel()
241             self.later = None
242
243 class KRPC:
244     """The KRPC protocol implementation.
245     
246     @ivar transport: the transport to use for the protocol
247     @type factory: L{khashmir.Khashmir}
248     @ivar factory: the main Khashmir program
249     @type stats: L{stats.StatsLogger}
250     @ivar stats: the statistics logger to save transport info
251     @type addr: (C{string}, C{int})
252     @ivar addr: the IP address and port of the source node
253     @type noisy: C{boolean}
254     @ivar noisy: whether to log additional details of the protocol
255     @type tids: C{dictionary}
256     @ivar tids: the transaction IDs outstanding for requests, keys are the
257         transaction ID of the request, values are the deferreds to call with
258         the results
259     @type stopped: C{boolean}
260     @ivar stopped: whether the protocol has been stopped
261     """
262     
263     def __init__(self, addr, server, stats, transport, spew = False):
264         """Initialize the protocol.
265         
266         @type addr: (C{string}, C{int})
267         @param addr: the IP address and port of the source node
268         @type server: L{khashmir.Khashmir}
269         @param server: the main Khashmir program
270         @type stats: L{stats.StatsLogger}
271         @param stats: the statistics logger to save transport info
272         @param transport: the transport to use for the protocol
273         @type spew: C{boolean}
274         @param spew: whether to log additional details of the protocol
275             (optional, defaults to False)
276         """
277         self.transport = transport
278         self.factory = server
279         self.stats = stats
280         self.addr = addr
281         self.noisy = spew
282         self.tids = {}
283         self.stopped = False
284
285     def datagramReceived(self, data, addr):
286         """Process the new datagram.
287         
288         @type data: C{string}
289         @param data: the data received from the transport.
290         @type addr: (C{string}, C{int})
291         @param addr: source IP address and port of datagram.
292         """
293         self.stats.receivedBytes(len(data))
294         if self.stopped:
295             if self.noisy:
296                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
297
298         # Bdecode the message
299         try:
300             msg = bdecode(data)
301         except Exception, e:
302             if self.noisy:
303                 log.msg("krpc bdecode error: ")
304                 log.err(e)
305             return
306
307         # Make sure the remote node isn't trying anything funny
308         try:
309             verifyMessage(msg)
310         except Exception, e:
311             log.msg("krpc message verification error: ")
312             log.err(e)
313             return
314
315         if self.noisy:
316             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
317
318         # Process it based on its type
319         if msg[TYP]  == REQ:
320             ilen = len(data)
321             
322             # Requests are handled by the factory
323             f = getattr(self.factory ,"krpc_" + msg[REQ], None)
324             msg[ARG]['_krpc_sender'] =  self.addr
325             if f and callable(f):
326                 self.stats.receivedAction(msg[REQ])
327                 try:
328                     ret = f(*(), **msg[ARG])
329                 except KrpcError, e:
330                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
331                     log.err(e)
332                     self.stats.errorAction(msg[REQ])
333                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
334                                               [e[0], e[1]])
335                 except TypeError, e:
336                     log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
337                     log.err(e)
338                     self.stats.errorAction(msg[REQ])
339                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
340                                               [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
341                 except Exception, e:
342                     log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
343                     log.err(e)
344                     self.stats.errorAction(msg[REQ])
345                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
346                                               [KRPC_ERROR_SERVER_ERROR, str(e)])
347                 else:
348                     olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
349             else:
350                 # Request for unknown method
351                 log.msg("ERROR: don't know about method %s" % msg[REQ])
352                 self.stats.receivedAction('unknown')
353                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
354                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
355             if self.noisy:
356                 log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
357                                                   ilen, msg[REQ], olen))
358         elif msg[TYP] == RSP:
359             # Responses get processed by their TID's deferred
360             if self.tids.has_key(msg[TID]):
361                 req = self.tids[msg[TID]]
362                 #       callback
363                 del(self.tids[msg[TID]])
364                 msg[RSP]['_krpc_sender'] = addr
365                 req.callback(msg[RSP])
366             else:
367                 # no tid, this transaction timed out already...
368                 if self.noisy:
369                     log.msg('timeout: %r' % msg[RSP]['id'])
370         elif msg[TYP] == ERR:
371             # Errors get processed by their TID's deferred's errback
372             if self.tids.has_key(msg[TID]):
373                 req = self.tids[msg[TID]]
374                 del(self.tids[msg[TID]])
375                 # callback
376                 req.errback(KrpcError(*msg[ERR]))
377             else:
378                 # day late and dollar short, just log it
379                 log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
380                 pass
381         else:
382             # Received an unknown message type
383             if self.noisy:
384                 log.msg("unknown message type: %r" % msg)
385             if msg[TID] in self.tids:
386                 req = self.tids[msg[TID]]
387                 del(self.tids[msg[TID]])
388                 # callback
389                 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
390                                       "Received an unknown message type: %r" % msg[TYP]))
391                 
392     def _sendResponse(self, request, addr, tid, msgType, response):
393         """Helper function for sending responses to nodes.
394
395         @param request: the name of the requested method
396         @type addr: (C{string}, C{int})
397         @param addr: source IP address and port of datagram.
398         @param tid: the transaction ID of the request
399         @param msgType: the type of message to respond with
400         @param response: the arguments for the response
401         """
402         if not response:
403             response = {}
404         
405         try:
406             # Create the response message
407             msg = {TID : tid, TYP : msgType, msgType : response}
408     
409             if self.noisy:
410                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
411     
412             out = bencode(msg)
413             
414             # Make sure its not too long
415             if len(out) > UDP_PACKET_LIMIT:
416                 # Can we remove some values to shorten it?
417                 if 'values' in response:
418                     # Save the original list of values
419                     orig_values = response['values']
420                     len_orig_values = len(bencode(orig_values))
421                     
422                     # Caclulate the maximum value length possible
423                     max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
424                     assert max_len_values > 0
425                     
426                     # Start with a calculation of how many values should be included
427                     # (assumes all values are the same length)
428                     per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
429                     num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
430     
431                     # Do a linear search for the actual maximum number possible
432                     bencoded_values = len(bencode(orig_values[:num_values]))
433                     while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
434                         bencoded_values += len(bencode(orig_values[num_values]))
435                         num_values += 1
436                     while bencoded_values > max_len_values and num_values > 0:
437                         num_values -= 1
438                         bencoded_values -= len(bencode(orig_values[num_values]))
439                     assert num_values > 0
440     
441                     # Encode the result
442                     response['values'] = orig_values[:num_values]
443                     out = bencode(msg)
444                     assert len(out) < UDP_PACKET_LIMIT
445                     log.msg('Shortened a long packet from %d to %d values, new packet length: %d' % 
446                             (len(orig_values), num_values, len(out)))
447                 else:
448                     # Too long a response, send an error
449                     log.msg('Could not send response, too long: %d bytes' % len(out))
450                     self.stats.errorAction(request)
451                     msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
452                     out = bencode(msg)
453
454         except Exception, e:
455             # Unknown error, send an error message
456             self.stats.errorAction(request)
457             msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
458             out = bencode(msg)
459                     
460         self.stats.sentBytes(len(out))
461         self.transport.write(out, addr)
462         return len(out)
463     
464     def sendRequest(self, method, args):
465         """Send a request to the remote node.
466         
467         @type method: C{string}
468         @param method: the method name to call on the remote node
469         @param args: the arguments to send to the remote node's method
470         """
471         if self.stopped:
472             raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
473
474         # Create the request message
475         newTID = newID()
476         msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
477         if self.noisy:
478             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
479         data = bencode(msg)
480         
481         # Create the deferred and save it with the TID
482         req = KrpcRequest(self, newTID, method, data, 10)
483         self.tids[newTID] = req
484         
485         # Save the conclusion of the action
486         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
487                          callbackArgs = (method, ), errbackArgs = (method, ))
488
489         req.send()
490         return req
491     
492     def sendData(self, method, data):
493         # Save some stats
494         self.stats.sentAction(method)
495         self.stats.sentBytes(len(data))
496         
497         self.transport.write(data, self.addr)
498         
499     def timeOut(self, badTID, method):
500         """Call the deferred's errback if a timeout occurs."""
501         if badTID in self.tids:
502             req = self.tids[badTID]
503             del(self.tids[badTID])
504             req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % 
505                                                       (method, self.addr)))
506         else:
507             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
508         
509     def stop(self):
510         """Timeout all pending requests."""
511         for req in self.tids.values():
512             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
513                                   'connection has been stopped while waiting for response'))
514         self.tids = {}
515         self.stopped = True
516
517 #{ For testing the KRPC protocol
518 def connectionForAddr(host, port):
519     return host
520     
521 class Receiver(protocol.Factory):
522     protocol = KRPC
523     def __init__(self):
524         self.buf = []
525     def krpc_store(self, msg, _krpc_sender):
526         self.buf += [msg]
527         return {}
528     def krpc_echo(self, msg, _krpc_sender):
529         return {'msg': msg}
530     def krpc_values(self, length, num, _krpc_sender):
531         return {'values': ['1'*length]*num}
532
533 def make(port):
534     from stats import StatsLogger
535     af = Receiver()
536     a = hostbroker(af, StatsLogger(None, None, {}), {'SPEW': False})
537     a.protocol = KRPC
538     p = reactor.listenUDP(port, a)
539     return af, a, p
540     
541 class KRPCTests(unittest.TestCase):
542     timeout = 2
543     
544     def setUp(self):
545         self.af, self.a, self.ap = make(1180)
546         self.bf, self.b, self.bp = make(1181)
547
548     def tearDown(self):
549         self.ap.stopListening()
550         self.bp.stopListening()
551
552     def bufEquals(self, result, value):
553         self.failUnlessEqual(self.bf.buf, value)
554
555     def testSimpleMessage(self):
556         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
557         d.addCallback(self.bufEquals, ["This is a test."])
558         return d
559
560     def testMessageBlast(self):
561         for i in range(100):
562             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
563         d.addCallback(self.bufEquals, ["This is a test."] * 100)
564         return d
565
566     def testEcho(self):
567         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
568         df.addCallback(self.gotMsg, "This is a test.")
569         return df
570
571     def gotMsg(self, dict, should_be):
572         _krpc_sender = dict['_krpc_sender']
573         self.failUnlessEqual(dict['msg'], should_be)
574
575     def testManyEcho(self):
576         for i in xrange(100):
577             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
578             df.addCallback(self.gotMsg, "This is a test.")
579         return df
580
581     def testMultiEcho(self):
582         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
583         df.addCallback(self.gotMsg, "This is a test.")
584
585         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
586         df.addCallback(self.gotMsg, "This is another test.")
587
588         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
589         df.addCallback(self.gotMsg, "This is yet another test.")
590         
591         return df
592
593     def testEchoReset(self):
594         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
595         df.addCallback(self.gotMsg, "This is a test.")
596
597         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
598         df.addCallback(self.gotMsg, "This is another test.")
599         df.addCallback(self.echoReset)
600         return df
601     
602     def echoReset(self, dict):
603         del(self.a.connections[('127.0.0.1', 1181)])
604         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
605         df.addCallback(self.gotMsg, "This is yet another test.")
606         return df
607
608     def testUnknownMeth(self):
609         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
610         df = self.failUnlessFailure(df, KrpcError)
611         df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
612         return df
613
614     def testMalformedRequest(self):
615         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
616         df = self.failUnlessFailure(df, KrpcError)
617         df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
618         return df
619
620     def gotErr(self, value, should_be, *errorTypes):
621         self.failUnlessEqual(value[0], should_be)
622         if errorTypes:
623             self.flushLoggedErrors(*errorTypes)
624         
625     def testLongPackets(self):
626         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
627         df.addCallback(self.gotLongRsp)
628         return df
629
630     def gotLongRsp(self, dict):
631         # Not quite accurate, but good enough
632         self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
633