Retransmit DHT requests before timeout occurs is complete.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The KRPC communication protocol implementation.
5
6 @var KRPC_INITIAL_DELAY: the number of seconds after which to try resending
7     the request, the resends will wait twice as long each time
8 @var KRPC_TIMEOUT: the number of seconds after which requests timeout
9 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
10     UDP packet without fragmentation
11
12 @var KRPC_ERROR: the code for a generic error
13 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
14 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
15 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
16 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
17 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
18 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
19
20 @var KRPC_ERROR_INTERNAL: the code for an internal error
21 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
22 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
23 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
24
25 @var TID: the identifier for the transaction ID
26 @var REQ: the identifier for a request packet
27 @var RSP: the identifier for a response packet
28 @var TYP: the identifier for the type of packet
29 @var ARG: the identifier for the argument to the request
30 @var ERR: the identifier for an error packet
31
32 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
33     KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
34     KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
35     KRPC_ERROR_RESPONSE_TOO_LONG
36 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
37     KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
38 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
39
40 """
41
42 from bencode import bencode, bdecode
43 from datetime import datetime, timedelta
44 from math import ceil
45
46 from twisted.internet import defer
47 from twisted.internet import protocol, reactor
48 from twisted.python import log
49 from twisted.trial import unittest
50
51 from khash import newID
52
53 KRPC_INITIAL_DELAY = 2
54 KRPC_TIMEOUT = 14
55 UDP_PACKET_LIMIT = 1472
56
57 # Remote node errors
58 KRPC_ERROR = 200
59 KRPC_ERROR_SERVER_ERROR = 201
60 KRPC_ERROR_MALFORMED_PACKET = 202
61 KRPC_ERROR_METHOD_UNKNOWN = 203
62 KRPC_ERROR_MALFORMED_REQUEST = 204
63 KRPC_ERROR_INVALID_TOKEN = 205
64 KRPC_ERROR_RESPONSE_TOO_LONG = 206
65
66 # Local errors
67 KRPC_ERROR_INTERNAL = 100
68 KRPC_ERROR_RECEIVED_UNKNOWN = 101
69 KRPC_ERROR_TIMEOUT = 102
70 KRPC_ERROR_PROTOCOL_STOPPED = 103
71
72 # commands
73 TID = 't'
74 REQ = 'q'
75 RSP = 'r'
76 TYP = 'y'
77 ARG = 'a'
78 ERR = 'e'
79
80 class KrpcError(Exception):
81     """An error occurred in the KRPC protocol."""
82     pass
83
84 def verifyMessage(msg):
85     """Check received message for corruption and errors.
86     
87     @type msg: C{dictionary}
88     @param msg: the dictionary of information received on the connection
89     @raise KrpcError: if the message is corrupt
90     """
91     
92     if type(msg) != dict:
93         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
94     if TYP not in msg:
95         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
96     if msg[TYP] == REQ:
97         if REQ not in msg:
98             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
99         if type(msg[REQ]) != str:
100             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
101         if ARG not in msg:
102             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
103         if type(msg[ARG]) != dict:
104             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
105     elif msg[TYP] == RSP:
106         if RSP not in msg:
107             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
108         if type(msg[RSP]) != dict:
109             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
110     elif msg[TYP] == ERR:
111         if ERR not in msg:
112             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
113         if type(msg[ERR]) != list:
114             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
115         if len(msg[ERR]) != 2:
116             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
117         if type(msg[ERR][0]) not in (int, long):
118             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
119         if type(msg[ERR][1]) != str:
120             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
121 #    else:
122 #        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
123     if TID not in msg:
124         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
125     if type(msg[TID]) != str:
126         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
127
128 class hostbroker(protocol.DatagramProtocol):
129     """The factory for the KRPC protocol.
130     
131     @type server: L{khashmir.Khashmir}
132     @ivar server: the main Khashmir program
133     @type stats: L{stats.StatsLogger}
134     @ivar stats: the statistics logger to save transport info
135     @type config: C{dictionary}
136     @ivar config: the configuration parameters for the DHT
137     @type connections: C{dictionary}
138     @ivar connections: all the connections that have ever been made to the
139         protocol, keys are IP address and port pairs, values are L{KRPC}
140         protocols for the addresses
141     @ivar protocol: the protocol to use to handle incoming connections
142         (added externally)
143     @type addr: (C{string}, C{int})
144     @ivar addr: the IP address and port of this node
145     """
146     
147     def __init__(self, server, stats, config):
148         """Initialize the factory.
149         
150         @type server: L{khashmir.Khashmir}
151         @param server: the main DHT program
152         @type stats: L{stats.StatsLogger}
153         @param stats: the statistics logger to save transport info
154         @type config: C{dictionary}
155         @param config: the configuration parameters for the DHT
156         """
157         self.server = server
158         self.stats = stats
159         self.config = config
160         # this should be changed to storage that drops old entries
161         self.connections = {}
162         
163     def datagramReceived(self, datagram, addr):
164         """Optionally create a new protocol object, and handle the new datagram.
165         
166         @type datagram: C{string}
167         @param datagram: the data received from the transport.
168         @type addr: (C{string}, C{int})
169         @param addr: source IP address and port of datagram.
170         """
171         c = self.connectionForAddr(addr)
172         c.datagramReceived(datagram, addr)
173         #if c.idle():
174         #    del self.connections[addr]
175
176     def connectionForAddr(self, addr):
177         """Get a protocol object for the source.
178         
179         @type addr: (C{string}, C{int})
180         @param addr: source IP address and port of datagram.
181         """
182         # Don't connect to ourself
183         if addr == self.addr:
184             raise KrcpError
185         
186         # Create a new protocol object if necessary
187         if not self.connections.has_key(addr):
188             conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW'])
189             self.connections[addr] = conn
190         else:
191             conn = self.connections[addr]
192         return conn
193
194     def makeConnection(self, transport):
195         """Make a connection to a transport and save our address."""
196         protocol.DatagramProtocol.makeConnection(self, transport)
197         tup = transport.getHost()
198         self.addr = (tup.host, tup.port)
199         
200     def stopProtocol(self):
201         """Stop all the open connections."""
202         for conn in self.connections.values():
203             conn.stop()
204         protocol.DatagramProtocol.stopProtocol(self)
205
206 class KrpcRequest(defer.Deferred):
207     """An outstanding request to a remote node.
208     
209     @type protocol: L{KRPC}
210     @ivar protocol: the protocol to send data with
211     @ivar tid: the transaction ID of the request
212     @type method: C{string}
213     @ivar method: the name of the method to call on the remote node
214     @type data: C{string}
215     @ivar data: the message to send to the remote node
216     @type delay: C{int}
217     @ivar delay: the last timeout delay sent
218     @type start: C{datetime}
219     @ivar start: the time to request was started at
220     @type later: L{twisted.internet.interfaces.IDelayedCall}
221     @ivar later: the pending call to timeout the last sent request
222     """
223     
224     def __init__(self, protocol, newTID, method, data):
225         """Initialize the request, and send it out.
226         
227         @type protocol: L{KRPC}
228         @param protocol: the protocol to send data with
229         @param newTID: the transaction ID of the request
230         @type method: C{string}
231         @param method: the name of the method to call on the remote node
232         @type data: C{string}
233         @param data: the message to send to the remote node
234         """
235         defer.Deferred.__init__(self)
236         self.protocol = protocol
237         self.tid = newTID
238         self.method = method
239         self.data = data
240         self.delay = KRPC_INITIAL_DELAY
241         self.start = datetime.now()
242         self.later = None
243         self.send()
244         
245     def send(self):
246         """Send the request to the remote node."""
247         assert not self.later, 'There is already a pending request'
248         self.later = reactor.callLater(self.delay, self.timeOut)
249         self.protocol.sendData(self.method, self.data)
250
251     def timeOut(self):
252         """Check for a unrecoverable timeout, otherwise resend."""
253         self.later = None
254         delay = datetime.now() - self.start
255         if delay > timedelta(seconds = KRPC_TIMEOUT):
256             log.msg('%r timed out after %0.2f sec' %
257                     (self.tid, delay.seconds + delay.microseconds/1000000.0))
258             self.protocol.timeOut(self.tid, self.method)
259         elif self.protocol.stopped:
260             log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
261         else:
262             self.delay *= 2
263             log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
264             self.send()
265         
266     def callback(self, resp):
267         self.dropTimeOut()
268         defer.Deferred.callback(self, resp)
269         
270     def errback(self, resp):
271         self.dropTimeOut()
272         defer.Deferred.errback(self, resp)
273         
274     def dropTimeOut(self):
275         """Cancel the timeout call when a response is received."""
276         if self.later and self.later.active():
277             self.later.cancel()
278         self.later = None
279
280 class KRPC:
281     """The KRPC protocol implementation.
282     
283     @ivar transport: the transport to use for the protocol
284     @type factory: L{khashmir.Khashmir}
285     @ivar factory: the main Khashmir program
286     @type stats: L{stats.StatsLogger}
287     @ivar stats: the statistics logger to save transport info
288     @type addr: (C{string}, C{int})
289     @ivar addr: the IP address and port of the source node
290     @type noisy: C{boolean}
291     @ivar noisy: whether to log additional details of the protocol
292     @type tids: C{dictionary}
293     @ivar tids: the transaction IDs outstanding for requests, keys are the
294         transaction ID of the request, values are the deferreds to call with
295         the results
296     @type stopped: C{boolean}
297     @ivar stopped: whether the protocol has been stopped
298     """
299     
300     def __init__(self, addr, server, stats, transport, spew = False):
301         """Initialize the protocol.
302         
303         @type addr: (C{string}, C{int})
304         @param addr: the IP address and port of the source node
305         @type server: L{khashmir.Khashmir}
306         @param server: the main Khashmir program
307         @type stats: L{stats.StatsLogger}
308         @param stats: the statistics logger to save transport info
309         @param transport: the transport to use for the protocol
310         @type spew: C{boolean}
311         @param spew: whether to log additional details of the protocol
312             (optional, defaults to False)
313         """
314         self.transport = transport
315         self.factory = server
316         self.stats = stats
317         self.addr = addr
318         self.noisy = spew
319         self.tids = {}
320         self.stopped = False
321
322     def datagramReceived(self, data, addr):
323         """Process the new datagram.
324         
325         @type data: C{string}
326         @param data: the data received from the transport.
327         @type addr: (C{string}, C{int})
328         @param addr: source IP address and port of datagram.
329         """
330         self.stats.receivedBytes(len(data))
331         if self.stopped:
332             if self.noisy:
333                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
334
335         # Bdecode the message
336         try:
337             msg = bdecode(data)
338         except Exception, e:
339             if self.noisy:
340                 log.msg("krpc bdecode error: ")
341                 log.err(e)
342             return
343
344         # Make sure the remote node isn't trying anything funny
345         try:
346             verifyMessage(msg)
347         except Exception, e:
348             log.msg("krpc message verification error: ")
349             log.err(e)
350             return
351
352         if self.noisy:
353             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
354
355         # Process it based on its type
356         if msg[TYP]  == REQ:
357             ilen = len(data)
358             
359             # Requests are handled by the factory
360             f = getattr(self.factory ,"krpc_" + msg[REQ], None)
361             msg[ARG]['_krpc_sender'] =  self.addr
362             if f and callable(f):
363                 self.stats.receivedAction(msg[REQ])
364                 try:
365                     ret = f(*(), **msg[ARG])
366                 except KrpcError, e:
367                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
368                     log.err(e)
369                     self.stats.errorAction(msg[REQ])
370                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
371                                               [e[0], e[1]])
372                 except TypeError, e:
373                     log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
374                     log.err(e)
375                     self.stats.errorAction(msg[REQ])
376                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
377                                               [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
378                 except Exception, e:
379                     log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
380                     log.err(e)
381                     self.stats.errorAction(msg[REQ])
382                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
383                                               [KRPC_ERROR_SERVER_ERROR, str(e)])
384                 else:
385                     olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
386             else:
387                 # Request for unknown method
388                 log.msg("ERROR: don't know about method %s" % msg[REQ])
389                 self.stats.receivedAction('unknown')
390                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
391                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
392             if self.noisy:
393                 log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
394                                                   ilen, msg[REQ], olen))
395         elif msg[TYP] == RSP:
396             # Responses get processed by their TID's deferred
397             if self.tids.has_key(msg[TID]):
398                 req = self.tids[msg[TID]]
399                 #       callback
400                 del(self.tids[msg[TID]])
401                 msg[RSP]['_krpc_sender'] = addr
402                 req.callback(msg[RSP])
403             else:
404                 # no tid, this transaction was finished already...
405                 if self.noisy:
406                     log.msg('received response from %r for completed request: %r' %
407                             (msg[RSP]['id'], msg[TID]))
408         elif msg[TYP] == ERR:
409             # Errors get processed by their TID's deferred's errback
410             if self.tids.has_key(msg[TID]):
411                 req = self.tids[msg[TID]]
412                 del(self.tids[msg[TID]])
413                 # callback
414                 req.errback(KrpcError(*msg[ERR]))
415             else:
416                 # no tid, this transaction was finished already...
417                 log.msg('received an error %r from %r for completed request: %r' %
418                         (msg[ERR], msg[RSP]['id'], msg[TID]))
419         else:
420             # Received an unknown message type
421             if self.noisy:
422                 log.msg("unknown message type: %r" % msg)
423             if msg[TID] in self.tids:
424                 req = self.tids[msg[TID]]
425                 del(self.tids[msg[TID]])
426                 # callback
427                 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
428                                       "Received an unknown message type: %r" % msg[TYP]))
429                 
430     def _sendResponse(self, request, addr, tid, msgType, response):
431         """Helper function for sending responses to nodes.
432
433         @param request: the name of the requested method
434         @type addr: (C{string}, C{int})
435         @param addr: source IP address and port of datagram.
436         @param tid: the transaction ID of the request
437         @param msgType: the type of message to respond with
438         @param response: the arguments for the response
439         """
440         if not response:
441             response = {}
442         
443         try:
444             # Create the response message
445             msg = {TID : tid, TYP : msgType, msgType : response}
446     
447             if self.noisy:
448                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
449     
450             out = bencode(msg)
451             
452             # Make sure its not too long
453             if len(out) > UDP_PACKET_LIMIT:
454                 # Can we remove some values to shorten it?
455                 if 'values' in response:
456                     # Save the original list of values
457                     orig_values = response['values']
458                     len_orig_values = len(bencode(orig_values))
459                     
460                     # Caclulate the maximum value length possible
461                     max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
462                     assert max_len_values > 0
463                     
464                     # Start with a calculation of how many values should be included
465                     # (assumes all values are the same length)
466                     per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
467                     num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
468     
469                     # Do a linear search for the actual maximum number possible
470                     bencoded_values = len(bencode(orig_values[:num_values]))
471                     while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
472                         bencoded_values += len(bencode(orig_values[num_values]))
473                         num_values += 1
474                     while bencoded_values > max_len_values and num_values > 0:
475                         num_values -= 1
476                         bencoded_values -= len(bencode(orig_values[num_values]))
477                     assert num_values > 0
478     
479                     # Encode the result
480                     response['values'] = orig_values[:num_values]
481                     out = bencode(msg)
482                     assert len(out) < UDP_PACKET_LIMIT
483                     log.msg('Shortened a long packet from %d to %d values, new packet length: %d' % 
484                             (len(orig_values), num_values, len(out)))
485                 else:
486                     # Too long a response, send an error
487                     log.msg('Could not send response, too long: %d bytes' % len(out))
488                     self.stats.errorAction(request)
489                     msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
490                     out = bencode(msg)
491
492         except Exception, e:
493             # Unknown error, send an error message
494             self.stats.errorAction(request)
495             msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
496             out = bencode(msg)
497                     
498         self.stats.sentBytes(len(out))
499         self.transport.write(out, addr)
500         return len(out)
501     
502     def sendRequest(self, method, args):
503         """Send a request to the remote node.
504         
505         @type method: C{string}
506         @param method: the method name to call on the remote node
507         @param args: the arguments to send to the remote node's method
508         """
509         if self.stopped:
510             return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
511                                         "cannot send, connection has been stopped"))
512
513         # Create the request message
514         newTID = newID()
515         msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
516         if self.noisy:
517             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
518         data = bencode(msg)
519         
520         # Create the request object and save it with the TID
521         req = KrpcRequest(self, newTID, method, data)
522         self.tids[newTID] = req
523         
524         # Save the conclusion of the action
525         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
526                          callbackArgs = (method, ), errbackArgs = (method, ))
527
528         return req
529     
530     def sendData(self, method, data):
531         """Write a request to the transport and save the stats.
532         
533         @type method: C{string}
534         @param method: the name of the method to call on the remote node
535         @type data: C{string}
536         @param data: the message to send to the remote node
537         """
538         self.stats.sentAction(method)
539         self.stats.sentBytes(len(data))
540         
541         self.transport.write(data, self.addr)
542         
543     def timeOut(self, badTID, method):
544         """Call the deferred's errback if a timeout occurs.
545         
546         @param badTID: the transaction ID of the request
547         @type method: C{string}
548         @param method: the name of the method that timed out on the remote node
549         """
550         if badTID in self.tids:
551             req = self.tids[badTID]
552             del(self.tids[badTID])
553             req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % 
554                                                       (method, self.addr)))
555         else:
556             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
557         
558     def stop(self):
559         """Cancel all pending requests."""
560         for req in self.tids.values():
561             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
562                                   'connection has been stopped while waiting for response'))
563         self.tids = {}
564         self.stopped = True
565
566 #{ For testing the KRPC protocol
567 def connectionForAddr(host, port):
568     return host
569     
570 class Receiver(protocol.Factory):
571     protocol = KRPC
572     def __init__(self):
573         self.buf = []
574     def krpc_store(self, msg, _krpc_sender):
575         self.buf += [msg]
576         return {}
577     def krpc_echo(self, msg, _krpc_sender):
578         return {'msg': msg}
579     def krpc_values(self, length, num, _krpc_sender):
580         return {'values': ['1'*length]*num}
581
582 def make(port):
583     from stats import StatsLogger
584     af = Receiver()
585     a = hostbroker(af, StatsLogger(None, None, {}), {'SPEW': False})
586     a.protocol = KRPC
587     p = reactor.listenUDP(port, a)
588     return af, a, p
589     
590 class KRPCTests(unittest.TestCase):
591     timeout = 2
592     
593     def setUp(self):
594         self.af, self.a, self.ap = make(1180)
595         self.bf, self.b, self.bp = make(1181)
596
597     def tearDown(self):
598         self.ap.stopListening()
599         self.bp.stopListening()
600
601     def bufEquals(self, result, value):
602         self.failUnlessEqual(self.bf.buf, value)
603
604     def testSimpleMessage(self):
605         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
606         d.addCallback(self.bufEquals, ["This is a test."])
607         return d
608
609     def testMessageBlast(self):
610         for i in range(100):
611             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
612         d.addCallback(self.bufEquals, ["This is a test."] * 100)
613         return d
614
615     def testEcho(self):
616         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
617         df.addCallback(self.gotMsg, "This is a test.")
618         return df
619
620     def gotMsg(self, dict, should_be):
621         _krpc_sender = dict['_krpc_sender']
622         self.failUnlessEqual(dict['msg'], should_be)
623
624     def testManyEcho(self):
625         for i in xrange(100):
626             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
627             df.addCallback(self.gotMsg, "This is a test.")
628         return df
629
630     def testMultiEcho(self):
631         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
632         df.addCallback(self.gotMsg, "This is a test.")
633
634         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
635         df.addCallback(self.gotMsg, "This is another test.")
636
637         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
638         df.addCallback(self.gotMsg, "This is yet another test.")
639         
640         return df
641
642     def testEchoReset(self):
643         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
644         df.addCallback(self.gotMsg, "This is a test.")
645
646         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
647         df.addCallback(self.gotMsg, "This is another test.")
648         df.addCallback(self.echoReset)
649         return df
650     
651     def echoReset(self, dict):
652         del(self.a.connections[('127.0.0.1', 1181)])
653         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
654         df.addCallback(self.gotMsg, "This is yet another test.")
655         return df
656
657     def testUnknownMeth(self):
658         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
659         df = self.failUnlessFailure(df, KrpcError)
660         df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
661         return df
662
663     def testMalformedRequest(self):
664         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
665         df = self.failUnlessFailure(df, KrpcError)
666         df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
667         return df
668
669     def gotErr(self, value, should_be, *errorTypes):
670         self.failUnlessEqual(value[0], should_be)
671         if errorTypes:
672             self.flushLoggedErrors(*errorTypes)
673         
674     def testLongPackets(self):
675         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
676         df.addCallback(self.gotLongRsp)
677         return df
678
679     def gotLongRsp(self, dict):
680         # Not quite accurate, but good enough
681         self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
682