Add an optimization to the HTTP client protocol so that pieplining begins sooner.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The KRPC communication protocol implementation.
5
6 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
7     UDP packet without fragmentation
8
9 @var KRPC_ERROR: the code for a generic error
10 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
11 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
12 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
13 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
14 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
15 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
16
17 @var KRPC_ERROR_INTERNAL: the code for an internal error
18 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
19 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
20 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
21
22 @var TID: the identifier for the transaction ID
23 @var REQ: the identifier for a request packet
24 @var RSP: the identifier for a response packet
25 @var TYP: the identifier for the type of packet
26 @var ARG: the identifier for the argument to the request
27 @var ERR: the identifier for an error packet
28
29 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
30     KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
31     KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
32     KRPC_ERROR_RESPONSE_TOO_LONG
33 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
34     KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
35 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
36
37 """
38
39 from bencode import bencode, bdecode
40 from datetime import datetime, timedelta
41 from math import ceil
42
43 from twisted.internet import defer
44 from twisted.internet import protocol, reactor
45 from twisted.python import log
46 from twisted.trial import unittest
47
48 from khash import newID
49
50 UDP_PACKET_LIMIT = 1472
51
52 # Remote node errors
53 KRPC_ERROR = 200
54 KRPC_ERROR_SERVER_ERROR = 201
55 KRPC_ERROR_MALFORMED_PACKET = 202
56 KRPC_ERROR_METHOD_UNKNOWN = 203
57 KRPC_ERROR_MALFORMED_REQUEST = 204
58 KRPC_ERROR_INVALID_TOKEN = 205
59 KRPC_ERROR_RESPONSE_TOO_LONG = 206
60
61 # Local errors
62 KRPC_ERROR_INTERNAL = 100
63 KRPC_ERROR_RECEIVED_UNKNOWN = 101
64 KRPC_ERROR_TIMEOUT = 102
65 KRPC_ERROR_PROTOCOL_STOPPED = 103
66
67 # commands
68 TID = 't'
69 REQ = 'q'
70 RSP = 'r'
71 TYP = 'y'
72 ARG = 'a'
73 ERR = 'e'
74
75 class KrpcError(Exception):
76     """An error occurred in the KRPC protocol."""
77     pass
78
79 def verifyMessage(msg):
80     """Check received message for corruption and errors.
81     
82     @type msg: C{dictionary}
83     @param msg: the dictionary of information received on the connection
84     @raise KrpcError: if the message is corrupt
85     """
86     
87     if type(msg) != dict:
88         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
89     if TYP not in msg:
90         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
91     if msg[TYP] == REQ:
92         if REQ not in msg:
93             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
94         if type(msg[REQ]) != str:
95             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
96         if ARG not in msg:
97             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
98         if type(msg[ARG]) != dict:
99             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
100     elif msg[TYP] == RSP:
101         if RSP not in msg:
102             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
103         if type(msg[RSP]) != dict:
104             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
105     elif msg[TYP] == ERR:
106         if ERR not in msg:
107             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
108         if type(msg[ERR]) != list:
109             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
110         if len(msg[ERR]) != 2:
111             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
112         if type(msg[ERR][0]) not in (int, long):
113             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
114         if type(msg[ERR][1]) != str:
115             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
116 #    else:
117 #        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
118     if TID not in msg:
119         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
120     if type(msg[TID]) != str:
121         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
122
123 class hostbroker(protocol.DatagramProtocol):
124     """The factory for the KRPC protocol.
125     
126     @type server: L{khashmir.Khashmir}
127     @ivar server: the main Khashmir program
128     @type stats: L{stats.StatsLogger}
129     @ivar stats: the statistics logger to save transport info
130     @type config: C{dictionary}
131     @ivar config: the configuration parameters for the DHT
132     @type connections: C{dictionary}
133     @ivar connections: all the connections that have ever been made to the
134         protocol, keys are IP address and port pairs, values are L{KRPC}
135         protocols for the addresses
136     @ivar protocol: the protocol to use to handle incoming connections
137         (added externally)
138     @type addr: (C{string}, C{int})
139     @ivar addr: the IP address and port of this node
140     """
141     
142     def __init__(self, server, stats, config):
143         """Initialize the factory.
144         
145         @type server: L{khashmir.Khashmir}
146         @param server: the main DHT program
147         @type stats: L{stats.StatsLogger}
148         @param stats: the statistics logger to save transport info
149         @type config: C{dictionary}
150         @param config: the configuration parameters for the DHT
151         """
152         self.server = server
153         self.stats = stats
154         self.config = config
155         # this should be changed to storage that drops old entries
156         self.connections = {}
157         
158     def datagramReceived(self, datagram, addr):
159         """Optionally create a new protocol object, and handle the new datagram.
160         
161         @type datagram: C{string}
162         @param datagram: the data received from the transport.
163         @type addr: (C{string}, C{int})
164         @param addr: source IP address and port of datagram.
165         """
166         c = self.connectionForAddr(addr)
167         c.datagramReceived(datagram, addr)
168         #if c.idle():
169         #    del self.connections[addr]
170
171     def connectionForAddr(self, addr):
172         """Get a protocol object for the source.
173         
174         @type addr: (C{string}, C{int})
175         @param addr: source IP address and port of datagram.
176         """
177         # Don't connect to ourself
178         if addr == self.addr:
179             raise KrcpError
180         
181         # Create a new protocol object if necessary
182         if not self.connections.has_key(addr):
183             conn = self.protocol(addr, self.server, self.stats, self.transport, self.config)
184             self.connections[addr] = conn
185         else:
186             conn = self.connections[addr]
187         return conn
188
189     def makeConnection(self, transport):
190         """Make a connection to a transport and save our address."""
191         protocol.DatagramProtocol.makeConnection(self, transport)
192         tup = transport.getHost()
193         self.addr = (tup.host, tup.port)
194         
195     def stopProtocol(self):
196         """Stop all the open connections."""
197         for conn in self.connections.values():
198             conn.stop()
199         protocol.DatagramProtocol.stopProtocol(self)
200
201 class KrpcRequest(defer.Deferred):
202     """An outstanding request to a remote node.
203     
204     @type protocol: L{KRPC}
205     @ivar protocol: the protocol to send data with
206     @ivar tid: the transaction ID of the request
207     @type method: C{string}
208     @ivar method: the name of the method to call on the remote node
209     @type data: C{string}
210     @ivar data: the message to send to the remote node
211     @type config: C{dictionary}
212     @ivar config: the configuration parameters for the DHT
213     @type delay: C{int}
214     @ivar delay: the last timeout delay sent
215     @type start: C{datetime}
216     @ivar start: the time to request was started at
217     @type later: L{twisted.internet.interfaces.IDelayedCall}
218     @ivar later: the pending call to timeout the last sent request
219     """
220     
221     def __init__(self, protocol, newTID, method, data, config):
222         """Initialize the request, and send it out.
223         
224         @type protocol: L{KRPC}
225         @param protocol: the protocol to send data with
226         @param newTID: the transaction ID of the request
227         @type method: C{string}
228         @param method: the name of the method to call on the remote node
229         @type data: C{string}
230         @param data: the message to send to the remote node
231         @type config: C{dictionary}
232         @param config: the configuration parameters for the DHT
233         """
234         defer.Deferred.__init__(self)
235         self.protocol = protocol
236         self.tid = newTID
237         self.method = method
238         self.data = data
239         self.config = config
240         self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
241         self.start = datetime.now()
242         self.later = None
243         reactor.callLater(0, self.send)
244         
245     def send(self):
246         """Send the request to the remote node."""
247         assert not self.later, 'There is already a pending request'
248         self.later = reactor.callLater(self.delay, self.timeOut)
249         try:
250             self.protocol.sendData(self.method, self.data)
251         except:
252             log.err()
253
254     def timeOut(self):
255         """Check for a unrecoverable timeout, otherwise resend."""
256         self.later = None
257         delay = datetime.now() - self.start
258         if delay > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 14)):
259             log.msg('%r timed out after %0.2f sec' %
260                     (self.tid, delay.seconds + delay.microseconds/1000000.0))
261             self.protocol.timeOut(self.tid, self.method)
262         elif self.protocol.stopped:
263             log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
264         else:
265             self.delay *= 2
266             log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
267             reactor.callLater(0, self.send)
268         
269     def callback(self, resp):
270         self.dropTimeOut()
271         defer.Deferred.callback(self, resp)
272         
273     def errback(self, resp):
274         self.dropTimeOut()
275         defer.Deferred.errback(self, resp)
276         
277     def dropTimeOut(self):
278         """Cancel the timeout call when a response is received."""
279         if self.later and self.later.active():
280             self.later.cancel()
281         self.later = None
282
283 class KRPC:
284     """The KRPC protocol implementation.
285     
286     @ivar transport: the transport to use for the protocol
287     @type factory: L{khashmir.Khashmir}
288     @ivar factory: the main Khashmir program
289     @type stats: L{stats.StatsLogger}
290     @ivar stats: the statistics logger to save transport info
291     @type addr: (C{string}, C{int})
292     @ivar addr: the IP address and port of the source node
293     @type config: C{dictionary}
294     @ivar config: the configuration parameters for the DHT
295     @type tids: C{dictionary}
296     @ivar tids: the transaction IDs outstanding for requests, keys are the
297         transaction ID of the request, values are the deferreds to call with
298         the results
299     @type stopped: C{boolean}
300     @ivar stopped: whether the protocol has been stopped
301     """
302     
303     def __init__(self, addr, server, stats, transport, config = {}):
304         """Initialize the protocol.
305         
306         @type addr: (C{string}, C{int})
307         @param addr: the IP address and port of the source node
308         @type server: L{khashmir.Khashmir}
309         @param server: the main Khashmir program
310         @type stats: L{stats.StatsLogger}
311         @param stats: the statistics logger to save transport info
312         @param transport: the transport to use for the protocol
313         @type config: C{dictionary}
314         @param config: the configuration parameters for the DHT
315             (optional, defaults to using defaults)
316         """
317         self.transport = transport
318         self.factory = server
319         self.stats = stats
320         self.addr = addr
321         self.config = config
322         self.tids = {}
323         self.stopped = False
324
325     def datagramReceived(self, data, addr):
326         """Process the new datagram.
327         
328         @type data: C{string}
329         @param data: the data received from the transport.
330         @type addr: (C{string}, C{int})
331         @param addr: source IP address and port of datagram.
332         """
333         self.stats.receivedBytes(len(data))
334         if self.stopped:
335             if self.config.get('SPEW', False):
336                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
337
338         # Bdecode the message
339         try:
340             msg = bdecode(data)
341         except Exception, e:
342             if self.config.get('SPEW', False):
343                 log.msg("krpc bdecode error: ")
344                 log.err(e)
345             return
346
347         # Make sure the remote node isn't trying anything funny
348         try:
349             verifyMessage(msg)
350         except Exception, e:
351             log.msg("krpc message verification error: ")
352             log.err(e)
353             return
354
355         if self.config.get('SPEW', False):
356             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
357
358         # Process it based on its type
359         if msg[TYP]  == REQ:
360             ilen = len(data)
361             
362             # Requests are handled by the factory
363             f = getattr(self.factory ,"krpc_" + msg[REQ], None)
364             msg[ARG]['_krpc_sender'] =  self.addr
365             if f and callable(f):
366                 self.stats.receivedAction(msg[REQ])
367                 try:
368                     ret = f(*(), **msg[ARG])
369                 except KrpcError, e:
370                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
371                     log.err(e)
372                     self.stats.errorAction(msg[REQ])
373                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
374                                               [e[0], e[1]])
375                 except TypeError, e:
376                     log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
377                     log.err(e)
378                     self.stats.errorAction(msg[REQ])
379                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
380                                               [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
381                 except Exception, e:
382                     log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
383                     log.err(e)
384                     self.stats.errorAction(msg[REQ])
385                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
386                                               [KRPC_ERROR_SERVER_ERROR, str(e)])
387                 else:
388                     olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
389             else:
390                 # Request for unknown method
391                 log.msg("ERROR: don't know about method %s" % msg[REQ])
392                 self.stats.receivedAction('unknown')
393                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
394                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
395
396             log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen))
397         elif msg[TYP] == RSP:
398             # Responses get processed by their TID's deferred
399             if self.tids.has_key(msg[TID]):
400                 req = self.tids[msg[TID]]
401                 #       callback
402                 del(self.tids[msg[TID]])
403                 msg[RSP]['_krpc_sender'] = addr
404                 req.callback(msg[RSP])
405             else:
406                 # no tid, this transaction was finished already...
407                 if self.config.get('SPEW', False):
408                     log.msg('received response from %r for completed request: %r' %
409                             (msg[RSP]['id'], msg[TID]))
410         elif msg[TYP] == ERR:
411             # Errors get processed by their TID's deferred's errback
412             if self.tids.has_key(msg[TID]):
413                 req = self.tids[msg[TID]]
414                 del(self.tids[msg[TID]])
415                 # callback
416                 req.errback(KrpcError(*msg[ERR]))
417             else:
418                 # no tid, this transaction was finished already...
419                 log.msg('received an error %r from %r for completed request: %r' %
420                         (msg[ERR], msg[RSP]['id'], msg[TID]))
421         else:
422             # Received an unknown message type
423             if self.config.get('SPEW', False):
424                 log.msg("unknown message type: %r" % msg)
425             if msg[TID] in self.tids:
426                 req = self.tids[msg[TID]]
427                 del(self.tids[msg[TID]])
428                 # callback
429                 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
430                                       "Received an unknown message type: %r" % msg[TYP]))
431                 
432     def _sendResponse(self, request, addr, tid, msgType, response):
433         """Helper function for sending responses to nodes.
434
435         @param request: the name of the requested method
436         @type addr: (C{string}, C{int})
437         @param addr: source IP address and port of datagram.
438         @param tid: the transaction ID of the request
439         @param msgType: the type of message to respond with
440         @param response: the arguments for the response
441         """
442         if not response:
443             response = {}
444         
445         try:
446             # Create the response message
447             msg = {TID : tid, TYP : msgType, msgType : response}
448     
449             if self.config.get('SPEW', False):
450                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
451     
452             out = bencode(msg)
453             
454             # Make sure its not too long
455             if len(out) > UDP_PACKET_LIMIT:
456                 # Can we remove some values to shorten it?
457                 if 'values' in response:
458                     # Save the original list of values
459                     orig_values = response['values']
460                     len_orig_values = len(bencode(orig_values))
461                     
462                     # Caclulate the maximum value length possible
463                     max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
464                     assert max_len_values > 0
465                     
466                     # Start with a calculation of how many values should be included
467                     # (assumes all values are the same length)
468                     per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
469                     num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
470     
471                     # Do a linear search for the actual maximum number possible
472                     bencoded_values = len(bencode(orig_values[:num_values]))
473                     while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
474                         bencoded_values += len(bencode(orig_values[num_values]))
475                         num_values += 1
476                     while bencoded_values > max_len_values and num_values > 0:
477                         num_values -= 1
478                         bencoded_values -= len(bencode(orig_values[num_values]))
479                     assert num_values > 0
480     
481                     # Encode the result
482                     response['values'] = orig_values[:num_values]
483                     out = bencode(msg)
484                     assert len(out) < UDP_PACKET_LIMIT
485                     log.msg('Shortened a long packet from %d to %d values, new packet length: %d' % 
486                             (len(orig_values), num_values, len(out)))
487                 else:
488                     # Too long a response, send an error
489                     log.msg('Could not send response, too long: %d bytes' % len(out))
490                     self.stats.errorAction(request)
491                     msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
492                     out = bencode(msg)
493
494         except Exception, e:
495             # Unknown error, send an error message
496             self.stats.errorAction(request)
497             msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
498             out = bencode(msg)
499                     
500         self.stats.sentBytes(len(out))
501         self.transport.write(out, addr)
502         return len(out)
503     
504     def sendRequest(self, method, args):
505         """Send a request to the remote node.
506         
507         @type method: C{string}
508         @param method: the method name to call on the remote node
509         @param args: the arguments to send to the remote node's method
510         """
511         if self.stopped:
512             return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
513                                         "cannot send, connection has been stopped"))
514
515         # Create the request message
516         newTID = newID()
517         msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
518         if self.config.get('SPEW', False):
519             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
520         data = bencode(msg)
521         
522         # Create the request object and save it with the TID
523         req = KrpcRequest(self, newTID, method, data, self.config)
524         self.tids[newTID] = req
525         
526         # Save the conclusion of the action
527         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
528                          callbackArgs = (method, datetime.now()),
529                          errbackArgs = (method, datetime.now()))
530
531         return req
532     
533     def sendData(self, method, data):
534         """Write a request to the transport and save the stats.
535         
536         @type method: C{string}
537         @param method: the name of the method to call on the remote node
538         @type data: C{string}
539         @param data: the message to send to the remote node
540         """
541         self.transport.write(data, self.addr)
542         self.stats.sentAction(method)
543         self.stats.sentBytes(len(data))
544         
545     def timeOut(self, badTID, method):
546         """Call the deferred's errback if a timeout occurs.
547         
548         @param badTID: the transaction ID of the request
549         @type method: C{string}
550         @param method: the name of the method that timed out on the remote node
551         """
552         if badTID in self.tids:
553             req = self.tids[badTID]
554             del(self.tids[badTID])
555             req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % 
556                                                       (method, self.addr)))
557         else:
558             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
559         
560     def stop(self):
561         """Cancel all pending requests."""
562         for req in self.tids.values():
563             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
564                                   'connection has been stopped while waiting for response'))
565         self.tids = {}
566         self.stopped = True
567
568 #{ For testing the KRPC protocol
569 def connectionForAddr(host, port):
570     return host
571     
572 class Receiver(protocol.Factory):
573     protocol = KRPC
574     def __init__(self):
575         self.buf = []
576     def krpc_store(self, msg, _krpc_sender):
577         self.buf += [msg]
578         return {}
579     def krpc_echo(self, msg, _krpc_sender):
580         return {'msg': msg}
581     def krpc_values(self, length, num, _krpc_sender):
582         return {'values': ['1'*length]*num}
583
584 def make(port):
585     from stats import StatsLogger
586     af = Receiver()
587     a = hostbroker(af, StatsLogger(None, None),
588                    {'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
589     a.protocol = KRPC
590     p = reactor.listenUDP(port, a)
591     return af, a, p
592     
593 class KRPCTests(unittest.TestCase):
594     timeout = 2
595     
596     def setUp(self):
597         self.af, self.a, self.ap = make(1180)
598         self.bf, self.b, self.bp = make(1181)
599
600     def tearDown(self):
601         self.ap.stopListening()
602         self.bp.stopListening()
603
604     def bufEquals(self, result, value):
605         self.failUnlessEqual(self.bf.buf, value)
606
607     def testSimpleMessage(self):
608         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
609         d.addCallback(self.bufEquals, ["This is a test."])
610         return d
611
612     def testMessageBlast(self):
613         for i in range(100):
614             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
615         d.addCallback(self.bufEquals, ["This is a test."] * 100)
616         return d
617
618     def testEcho(self):
619         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
620         df.addCallback(self.gotMsg, "This is a test.")
621         return df
622
623     def gotMsg(self, dict, should_be):
624         _krpc_sender = dict['_krpc_sender']
625         self.failUnlessEqual(dict['msg'], should_be)
626
627     def testManyEcho(self):
628         for i in xrange(100):
629             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
630             df.addCallback(self.gotMsg, "This is a test.")
631         return df
632
633     def testMultiEcho(self):
634         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
635         df.addCallback(self.gotMsg, "This is a test.")
636
637         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
638         df.addCallback(self.gotMsg, "This is another test.")
639
640         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
641         df.addCallback(self.gotMsg, "This is yet another test.")
642         
643         return df
644
645     def testEchoReset(self):
646         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
647         df.addCallback(self.gotMsg, "This is a test.")
648
649         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
650         df.addCallback(self.gotMsg, "This is another test.")
651         df.addCallback(self.echoReset)
652         return df
653     
654     def echoReset(self, dict):
655         del(self.a.connections[('127.0.0.1', 1181)])
656         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
657         df.addCallback(self.gotMsg, "This is yet another test.")
658         return df
659
660     def testUnknownMeth(self):
661         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
662         df = self.failUnlessFailure(df, KrpcError)
663         df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
664         return df
665
666     def testMalformedRequest(self):
667         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
668         df = self.failUnlessFailure(df, KrpcError)
669         df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
670         return df
671
672     def gotErr(self, value, should_be, *errorTypes):
673         self.failUnlessEqual(value[0], should_be)
674         if errorTypes:
675             self.flushLoggedErrors(*errorTypes)
676         
677     def testLongPackets(self):
678         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
679         df.addCallback(self.gotLongRsp)
680         return df
681
682     def gotLongRsp(self, dict):
683         # Not quite accurate, but good enough
684         self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
685