Make the DHT timeouts configuration parameters.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The KRPC communication protocol implementation.
5
6 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
7     UDP packet without fragmentation
8
9 @var KRPC_ERROR: the code for a generic error
10 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
11 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
12 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
13 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
14 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
15 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
16
17 @var KRPC_ERROR_INTERNAL: the code for an internal error
18 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
19 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
20 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
21
22 @var TID: the identifier for the transaction ID
23 @var REQ: the identifier for a request packet
24 @var RSP: the identifier for a response packet
25 @var TYP: the identifier for the type of packet
26 @var ARG: the identifier for the argument to the request
27 @var ERR: the identifier for an error packet
28
29 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
30     KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
31     KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
32     KRPC_ERROR_RESPONSE_TOO_LONG
33 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
34     KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
35 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
36
37 """
38
39 from bencode import bencode, bdecode
40 from datetime import datetime, timedelta
41 from math import ceil
42
43 from twisted.internet import defer
44 from twisted.internet import protocol, reactor
45 from twisted.python import log
46 from twisted.trial import unittest
47
48 from khash import newID
49
50 UDP_PACKET_LIMIT = 1472
51
52 # Remote node errors
53 KRPC_ERROR = 200
54 KRPC_ERROR_SERVER_ERROR = 201
55 KRPC_ERROR_MALFORMED_PACKET = 202
56 KRPC_ERROR_METHOD_UNKNOWN = 203
57 KRPC_ERROR_MALFORMED_REQUEST = 204
58 KRPC_ERROR_INVALID_TOKEN = 205
59 KRPC_ERROR_RESPONSE_TOO_LONG = 206
60
61 # Local errors
62 KRPC_ERROR_INTERNAL = 100
63 KRPC_ERROR_RECEIVED_UNKNOWN = 101
64 KRPC_ERROR_TIMEOUT = 102
65 KRPC_ERROR_PROTOCOL_STOPPED = 103
66
67 # commands
68 TID = 't'
69 REQ = 'q'
70 RSP = 'r'
71 TYP = 'y'
72 ARG = 'a'
73 ERR = 'e'
74
75 class KrpcError(Exception):
76     """An error occurred in the KRPC protocol."""
77     pass
78
79 def verifyMessage(msg):
80     """Check received message for corruption and errors.
81     
82     @type msg: C{dictionary}
83     @param msg: the dictionary of information received on the connection
84     @raise KrpcError: if the message is corrupt
85     """
86     
87     if type(msg) != dict:
88         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
89     if TYP not in msg:
90         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
91     if msg[TYP] == REQ:
92         if REQ not in msg:
93             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
94         if type(msg[REQ]) != str:
95             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
96         if ARG not in msg:
97             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
98         if type(msg[ARG]) != dict:
99             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
100     elif msg[TYP] == RSP:
101         if RSP not in msg:
102             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
103         if type(msg[RSP]) != dict:
104             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
105     elif msg[TYP] == ERR:
106         if ERR not in msg:
107             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
108         if type(msg[ERR]) != list:
109             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
110         if len(msg[ERR]) != 2:
111             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
112         if type(msg[ERR][0]) not in (int, long):
113             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
114         if type(msg[ERR][1]) != str:
115             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
116 #    else:
117 #        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
118     if TID not in msg:
119         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
120     if type(msg[TID]) != str:
121         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
122
123 class hostbroker(protocol.DatagramProtocol):
124     """The factory for the KRPC protocol.
125     
126     @type server: L{khashmir.Khashmir}
127     @ivar server: the main Khashmir program
128     @type stats: L{stats.StatsLogger}
129     @ivar stats: the statistics logger to save transport info
130     @type config: C{dictionary}
131     @ivar config: the configuration parameters for the DHT
132     @type connections: C{dictionary}
133     @ivar connections: all the connections that have ever been made to the
134         protocol, keys are IP address and port pairs, values are L{KRPC}
135         protocols for the addresses
136     @ivar protocol: the protocol to use to handle incoming connections
137         (added externally)
138     @type addr: (C{string}, C{int})
139     @ivar addr: the IP address and port of this node
140     """
141     
142     def __init__(self, server, stats, config):
143         """Initialize the factory.
144         
145         @type server: L{khashmir.Khashmir}
146         @param server: the main DHT program
147         @type stats: L{stats.StatsLogger}
148         @param stats: the statistics logger to save transport info
149         @type config: C{dictionary}
150         @param config: the configuration parameters for the DHT
151         """
152         self.server = server
153         self.stats = stats
154         self.config = config
155         # this should be changed to storage that drops old entries
156         self.connections = {}
157         
158     def datagramReceived(self, datagram, addr):
159         """Optionally create a new protocol object, and handle the new datagram.
160         
161         @type datagram: C{string}
162         @param datagram: the data received from the transport.
163         @type addr: (C{string}, C{int})
164         @param addr: source IP address and port of datagram.
165         """
166         c = self.connectionForAddr(addr)
167         c.datagramReceived(datagram, addr)
168         #if c.idle():
169         #    del self.connections[addr]
170
171     def connectionForAddr(self, addr):
172         """Get a protocol object for the source.
173         
174         @type addr: (C{string}, C{int})
175         @param addr: source IP address and port of datagram.
176         """
177         # Don't connect to ourself
178         if addr == self.addr:
179             raise KrcpError
180         
181         # Create a new protocol object if necessary
182         if not self.connections.has_key(addr):
183             conn = self.protocol(addr, self.server, self.stats, self.transport, self.config)
184             self.connections[addr] = conn
185         else:
186             conn = self.connections[addr]
187         return conn
188
189     def makeConnection(self, transport):
190         """Make a connection to a transport and save our address."""
191         protocol.DatagramProtocol.makeConnection(self, transport)
192         tup = transport.getHost()
193         self.addr = (tup.host, tup.port)
194         
195     def stopProtocol(self):
196         """Stop all the open connections."""
197         for conn in self.connections.values():
198             conn.stop()
199         protocol.DatagramProtocol.stopProtocol(self)
200
201 class KrpcRequest(defer.Deferred):
202     """An outstanding request to a remote node.
203     
204     @type protocol: L{KRPC}
205     @ivar protocol: the protocol to send data with
206     @ivar tid: the transaction ID of the request
207     @type method: C{string}
208     @ivar method: the name of the method to call on the remote node
209     @type data: C{string}
210     @ivar data: the message to send to the remote node
211     @type config: C{dictionary}
212     @ivar config: the configuration parameters for the DHT
213     @type delay: C{int}
214     @ivar delay: the last timeout delay sent
215     @type start: C{datetime}
216     @ivar start: the time to request was started at
217     @type later: L{twisted.internet.interfaces.IDelayedCall}
218     @ivar later: the pending call to timeout the last sent request
219     """
220     
221     def __init__(self, protocol, newTID, method, data, config):
222         """Initialize the request, and send it out.
223         
224         @type protocol: L{KRPC}
225         @param protocol: the protocol to send data with
226         @param newTID: the transaction ID of the request
227         @type method: C{string}
228         @param method: the name of the method to call on the remote node
229         @type data: C{string}
230         @param data: the message to send to the remote node
231         @type config: C{dictionary}
232         @param config: the configuration parameters for the DHT
233         """
234         defer.Deferred.__init__(self)
235         self.protocol = protocol
236         self.tid = newTID
237         self.method = method
238         self.data = data
239         self.config = config
240         self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
241         self.start = datetime.now()
242         self.later = None
243         self.send()
244         
245     def send(self):
246         """Send the request to the remote node."""
247         assert not self.later, 'There is already a pending request'
248         self.later = reactor.callLater(self.delay, self.timeOut)
249         self.protocol.sendData(self.method, self.data)
250
251     def timeOut(self):
252         """Check for a unrecoverable timeout, otherwise resend."""
253         self.later = None
254         delay = datetime.now() - self.start
255         if delay > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 14)):
256             log.msg('%r timed out after %0.2f sec' %
257                     (self.tid, delay.seconds + delay.microseconds/1000000.0))
258             self.protocol.timeOut(self.tid, self.method)
259         elif self.protocol.stopped:
260             log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
261         else:
262             self.delay *= 2
263             log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
264             self.send()
265         
266     def callback(self, resp):
267         self.dropTimeOut()
268         defer.Deferred.callback(self, resp)
269         
270     def errback(self, resp):
271         self.dropTimeOut()
272         defer.Deferred.errback(self, resp)
273         
274     def dropTimeOut(self):
275         """Cancel the timeout call when a response is received."""
276         if self.later and self.later.active():
277             self.later.cancel()
278         self.later = None
279
280 class KRPC:
281     """The KRPC protocol implementation.
282     
283     @ivar transport: the transport to use for the protocol
284     @type factory: L{khashmir.Khashmir}
285     @ivar factory: the main Khashmir program
286     @type stats: L{stats.StatsLogger}
287     @ivar stats: the statistics logger to save transport info
288     @type addr: (C{string}, C{int})
289     @ivar addr: the IP address and port of the source node
290     @type config: C{dictionary}
291     @ivar config: the configuration parameters for the DHT
292     @type tids: C{dictionary}
293     @ivar tids: the transaction IDs outstanding for requests, keys are the
294         transaction ID of the request, values are the deferreds to call with
295         the results
296     @type stopped: C{boolean}
297     @ivar stopped: whether the protocol has been stopped
298     """
299     
300     def __init__(self, addr, server, stats, transport, config = {}):
301         """Initialize the protocol.
302         
303         @type addr: (C{string}, C{int})
304         @param addr: the IP address and port of the source node
305         @type server: L{khashmir.Khashmir}
306         @param server: the main Khashmir program
307         @type stats: L{stats.StatsLogger}
308         @param stats: the statistics logger to save transport info
309         @param transport: the transport to use for the protocol
310         @type config: C{dictionary}
311         @param config: the configuration parameters for the DHT
312             (optional, defaults to using defaults)
313         """
314         self.transport = transport
315         self.factory = server
316         self.stats = stats
317         self.addr = addr
318         self.config = config
319         self.tids = {}
320         self.stopped = False
321
322     def datagramReceived(self, data, addr):
323         """Process the new datagram.
324         
325         @type data: C{string}
326         @param data: the data received from the transport.
327         @type addr: (C{string}, C{int})
328         @param addr: source IP address and port of datagram.
329         """
330         self.stats.receivedBytes(len(data))
331         if self.stopped:
332             if self.config.get('SPEW', False):
333                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
334
335         # Bdecode the message
336         try:
337             msg = bdecode(data)
338         except Exception, e:
339             if self.config.get('SPEW', False):
340                 log.msg("krpc bdecode error: ")
341                 log.err(e)
342             return
343
344         # Make sure the remote node isn't trying anything funny
345         try:
346             verifyMessage(msg)
347         except Exception, e:
348             log.msg("krpc message verification error: ")
349             log.err(e)
350             return
351
352         if self.config.get('SPEW', False):
353             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
354
355         # Process it based on its type
356         if msg[TYP]  == REQ:
357             ilen = len(data)
358             
359             # Requests are handled by the factory
360             f = getattr(self.factory ,"krpc_" + msg[REQ], None)
361             msg[ARG]['_krpc_sender'] =  self.addr
362             if f and callable(f):
363                 self.stats.receivedAction(msg[REQ])
364                 try:
365                     ret = f(*(), **msg[ARG])
366                 except KrpcError, e:
367                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
368                     log.err(e)
369                     self.stats.errorAction(msg[REQ])
370                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
371                                               [e[0], e[1]])
372                 except TypeError, e:
373                     log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
374                     log.err(e)
375                     self.stats.errorAction(msg[REQ])
376                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
377                                               [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
378                 except Exception, e:
379                     log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
380                     log.err(e)
381                     self.stats.errorAction(msg[REQ])
382                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
383                                               [KRPC_ERROR_SERVER_ERROR, str(e)])
384                 else:
385                     olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
386             else:
387                 # Request for unknown method
388                 log.msg("ERROR: don't know about method %s" % msg[REQ])
389                 self.stats.receivedAction('unknown')
390                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
391                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
392
393             log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen))
394         elif msg[TYP] == RSP:
395             # Responses get processed by their TID's deferred
396             if self.tids.has_key(msg[TID]):
397                 req = self.tids[msg[TID]]
398                 #       callback
399                 del(self.tids[msg[TID]])
400                 msg[RSP]['_krpc_sender'] = addr
401                 req.callback(msg[RSP])
402             else:
403                 # no tid, this transaction was finished already...
404                 if self.config.get('SPEW', False):
405                     log.msg('received response from %r for completed request: %r' %
406                             (msg[RSP]['id'], msg[TID]))
407         elif msg[TYP] == ERR:
408             # Errors get processed by their TID's deferred's errback
409             if self.tids.has_key(msg[TID]):
410                 req = self.tids[msg[TID]]
411                 del(self.tids[msg[TID]])
412                 # callback
413                 req.errback(KrpcError(*msg[ERR]))
414             else:
415                 # no tid, this transaction was finished already...
416                 log.msg('received an error %r from %r for completed request: %r' %
417                         (msg[ERR], msg[RSP]['id'], msg[TID]))
418         else:
419             # Received an unknown message type
420             if self.config.get('SPEW', False):
421                 log.msg("unknown message type: %r" % msg)
422             if msg[TID] in self.tids:
423                 req = self.tids[msg[TID]]
424                 del(self.tids[msg[TID]])
425                 # callback
426                 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
427                                       "Received an unknown message type: %r" % msg[TYP]))
428                 
429     def _sendResponse(self, request, addr, tid, msgType, response):
430         """Helper function for sending responses to nodes.
431
432         @param request: the name of the requested method
433         @type addr: (C{string}, C{int})
434         @param addr: source IP address and port of datagram.
435         @param tid: the transaction ID of the request
436         @param msgType: the type of message to respond with
437         @param response: the arguments for the response
438         """
439         if not response:
440             response = {}
441         
442         try:
443             # Create the response message
444             msg = {TID : tid, TYP : msgType, msgType : response}
445     
446             if self.config.get('SPEW', False):
447                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
448     
449             out = bencode(msg)
450             
451             # Make sure its not too long
452             if len(out) > UDP_PACKET_LIMIT:
453                 # Can we remove some values to shorten it?
454                 if 'values' in response:
455                     # Save the original list of values
456                     orig_values = response['values']
457                     len_orig_values = len(bencode(orig_values))
458                     
459                     # Caclulate the maximum value length possible
460                     max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
461                     assert max_len_values > 0
462                     
463                     # Start with a calculation of how many values should be included
464                     # (assumes all values are the same length)
465                     per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
466                     num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
467     
468                     # Do a linear search for the actual maximum number possible
469                     bencoded_values = len(bencode(orig_values[:num_values]))
470                     while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
471                         bencoded_values += len(bencode(orig_values[num_values]))
472                         num_values += 1
473                     while bencoded_values > max_len_values and num_values > 0:
474                         num_values -= 1
475                         bencoded_values -= len(bencode(orig_values[num_values]))
476                     assert num_values > 0
477     
478                     # Encode the result
479                     response['values'] = orig_values[:num_values]
480                     out = bencode(msg)
481                     assert len(out) < UDP_PACKET_LIMIT
482                     log.msg('Shortened a long packet from %d to %d values, new packet length: %d' % 
483                             (len(orig_values), num_values, len(out)))
484                 else:
485                     # Too long a response, send an error
486                     log.msg('Could not send response, too long: %d bytes' % len(out))
487                     self.stats.errorAction(request)
488                     msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
489                     out = bencode(msg)
490
491         except Exception, e:
492             # Unknown error, send an error message
493             self.stats.errorAction(request)
494             msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
495             out = bencode(msg)
496                     
497         self.stats.sentBytes(len(out))
498         self.transport.write(out, addr)
499         return len(out)
500     
501     def sendRequest(self, method, args):
502         """Send a request to the remote node.
503         
504         @type method: C{string}
505         @param method: the method name to call on the remote node
506         @param args: the arguments to send to the remote node's method
507         """
508         if self.stopped:
509             return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
510                                         "cannot send, connection has been stopped"))
511
512         # Create the request message
513         newTID = newID()
514         msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
515         if self.config.get('SPEW', False):
516             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
517         data = bencode(msg)
518         
519         # Create the request object and save it with the TID
520         req = KrpcRequest(self, newTID, method, data, self.config)
521         self.tids[newTID] = req
522         
523         # Save the conclusion of the action
524         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
525                          callbackArgs = (method, datetime.now()),
526                          errbackArgs = (method, datetime.now()))
527
528         return req
529     
530     def sendData(self, method, data):
531         """Write a request to the transport and save the stats.
532         
533         @type method: C{string}
534         @param method: the name of the method to call on the remote node
535         @type data: C{string}
536         @param data: the message to send to the remote node
537         """
538         self.stats.sentAction(method)
539         self.stats.sentBytes(len(data))
540         
541         self.transport.write(data, self.addr)
542         
543     def timeOut(self, badTID, method):
544         """Call the deferred's errback if a timeout occurs.
545         
546         @param badTID: the transaction ID of the request
547         @type method: C{string}
548         @param method: the name of the method that timed out on the remote node
549         """
550         if badTID in self.tids:
551             req = self.tids[badTID]
552             del(self.tids[badTID])
553             req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % 
554                                                       (method, self.addr)))
555         else:
556             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
557         
558     def stop(self):
559         """Cancel all pending requests."""
560         for req in self.tids.values():
561             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
562                                   'connection has been stopped while waiting for response'))
563         self.tids = {}
564         self.stopped = True
565
566 #{ For testing the KRPC protocol
567 def connectionForAddr(host, port):
568     return host
569     
570 class Receiver(protocol.Factory):
571     protocol = KRPC
572     def __init__(self):
573         self.buf = []
574     def krpc_store(self, msg, _krpc_sender):
575         self.buf += [msg]
576         return {}
577     def krpc_echo(self, msg, _krpc_sender):
578         return {'msg': msg}
579     def krpc_values(self, length, num, _krpc_sender):
580         return {'values': ['1'*length]*num}
581
582 def make(port):
583     from stats import StatsLogger
584     af = Receiver()
585     a = hostbroker(af, StatsLogger(None, None),
586                    {'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
587     a.protocol = KRPC
588     p = reactor.listenUDP(port, a)
589     return af, a, p
590     
591 class KRPCTests(unittest.TestCase):
592     timeout = 2
593     
594     def setUp(self):
595         self.af, self.a, self.ap = make(1180)
596         self.bf, self.b, self.bp = make(1181)
597
598     def tearDown(self):
599         self.ap.stopListening()
600         self.bp.stopListening()
601
602     def bufEquals(self, result, value):
603         self.failUnlessEqual(self.bf.buf, value)
604
605     def testSimpleMessage(self):
606         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
607         d.addCallback(self.bufEquals, ["This is a test."])
608         return d
609
610     def testMessageBlast(self):
611         for i in range(100):
612             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
613         d.addCallback(self.bufEquals, ["This is a test."] * 100)
614         return d
615
616     def testEcho(self):
617         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
618         df.addCallback(self.gotMsg, "This is a test.")
619         return df
620
621     def gotMsg(self, dict, should_be):
622         _krpc_sender = dict['_krpc_sender']
623         self.failUnlessEqual(dict['msg'], should_be)
624
625     def testManyEcho(self):
626         for i in xrange(100):
627             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
628             df.addCallback(self.gotMsg, "This is a test.")
629         return df
630
631     def testMultiEcho(self):
632         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
633         df.addCallback(self.gotMsg, "This is a test.")
634
635         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
636         df.addCallback(self.gotMsg, "This is another test.")
637
638         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
639         df.addCallback(self.gotMsg, "This is yet another test.")
640         
641         return df
642
643     def testEchoReset(self):
644         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
645         df.addCallback(self.gotMsg, "This is a test.")
646
647         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
648         df.addCallback(self.gotMsg, "This is another test.")
649         df.addCallback(self.echoReset)
650         return df
651     
652     def echoReset(self, dict):
653         del(self.a.connections[('127.0.0.1', 1181)])
654         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
655         df.addCallback(self.gotMsg, "This is yet another test.")
656         return df
657
658     def testUnknownMeth(self):
659         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
660         df = self.failUnlessFailure(df, KrpcError)
661         df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
662         return df
663
664     def testMalformedRequest(self):
665         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
666         df = self.failUnlessFailure(df, KrpcError)
667         df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
668         return df
669
670     def gotErr(self, value, should_be, *errorTypes):
671         self.failUnlessEqual(value[0], should_be)
672         if errorTypes:
673             self.flushLoggedErrors(*errorTypes)
674         
675     def testLongPackets(self):
676         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
677         df.addCallback(self.gotLongRsp)
678         return df
679
680     def gotLongRsp(self, dict):
681         # Not quite accurate, but good enough
682         self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
683