]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_p2p_Khashmir/krpc.py
Clean up the copyrights mentioned in the code.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
1
2 """The KRPC communication protocol implementation.
3
4 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
5     UDP packet without fragmentation
6
7 @var KRPC_ERROR: the code for a generic error
8 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
9 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
10 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
11 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
12 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
13 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
14
15 @var KRPC_ERROR_INTERNAL: the code for an internal error
16 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
17 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
18 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
19
20 @var TID: the identifier for the transaction ID
21 @var REQ: the identifier for a request packet
22 @var RSP: the identifier for a response packet
23 @var TYP: the identifier for the type of packet
24 @var ARG: the identifier for the argument to the request
25 @var ERR: the identifier for an error packet
26
27 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
28     KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
29     KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
30     KRPC_ERROR_RESPONSE_TOO_LONG
31 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
32     KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
33 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
34
35 """
36
37 from bencode import bencode, bdecode
38 from datetime import datetime, timedelta
39 from math import ceil
40
41 from twisted.internet import defer
42 from twisted.internet import protocol, reactor
43 from twisted.python import log
44 from twisted.trial import unittest
45
46 from khash import newID
47
48 UDP_PACKET_LIMIT = 1472
49
50 # Remote node errors
51 KRPC_ERROR = 200
52 KRPC_ERROR_SERVER_ERROR = 201
53 KRPC_ERROR_MALFORMED_PACKET = 202
54 KRPC_ERROR_METHOD_UNKNOWN = 203
55 KRPC_ERROR_MALFORMED_REQUEST = 204
56 KRPC_ERROR_INVALID_TOKEN = 205
57 KRPC_ERROR_RESPONSE_TOO_LONG = 206
58
59 # Local errors
60 KRPC_ERROR_INTERNAL = 100
61 KRPC_ERROR_RECEIVED_UNKNOWN = 101
62 KRPC_ERROR_TIMEOUT = 102
63 KRPC_ERROR_PROTOCOL_STOPPED = 103
64
65 # commands
66 TID = 't'
67 REQ = 'q'
68 RSP = 'r'
69 TYP = 'y'
70 ARG = 'a'
71 ERR = 'e'
72
73 class KrpcError(Exception):
74     """An error occurred in the KRPC protocol."""
75     pass
76
77 def verifyMessage(msg):
78     """Check received message for corruption and errors.
79     
80     @type msg: C{dictionary}
81     @param msg: the dictionary of information received on the connection
82     @raise KrpcError: if the message is corrupt
83     """
84     
85     if type(msg) != dict:
86         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
87     if TYP not in msg:
88         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
89     if msg[TYP] == REQ:
90         if REQ not in msg:
91             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
92         if type(msg[REQ]) != str:
93             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
94         if ARG not in msg:
95             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
96         if type(msg[ARG]) != dict:
97             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
98     elif msg[TYP] == RSP:
99         if RSP not in msg:
100             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
101         if type(msg[RSP]) != dict:
102             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
103     elif msg[TYP] == ERR:
104         if ERR not in msg:
105             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
106         if type(msg[ERR]) != list:
107             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
108         if len(msg[ERR]) != 2:
109             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
110         if type(msg[ERR][0]) not in (int, long):
111             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
112         if type(msg[ERR][1]) != str:
113             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
114 #    else:
115 #        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
116     if TID not in msg:
117         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
118     if type(msg[TID]) != str:
119         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
120
121 class hostbroker(protocol.DatagramProtocol):
122     """The factory for the KRPC protocol.
123     
124     @type server: L{khashmir.Khashmir}
125     @ivar server: the main Khashmir program
126     @type stats: L{stats.StatsLogger}
127     @ivar stats: the statistics logger to save transport info
128     @type config: C{dictionary}
129     @ivar config: the configuration parameters for the DHT
130     @type connections: C{dictionary}
131     @ivar connections: all the connections that have ever been made to the
132         protocol, keys are IP address and port pairs, values are L{KRPC}
133         protocols for the addresses
134     @ivar protocol: the protocol to use to handle incoming connections
135         (added externally)
136     @type addr: (C{string}, C{int})
137     @ivar addr: the IP address and port of this node
138     """
139     
140     def __init__(self, server, stats, config):
141         """Initialize the factory.
142         
143         @type server: L{khashmir.Khashmir}
144         @param server: the main DHT program
145         @type stats: L{stats.StatsLogger}
146         @param stats: the statistics logger to save transport info
147         @type config: C{dictionary}
148         @param config: the configuration parameters for the DHT
149         """
150         self.server = server
151         self.stats = stats
152         self.config = config
153         # this should be changed to storage that drops old entries
154         self.connections = {}
155         
156     def datagramReceived(self, datagram, addr):
157         """Optionally create a new protocol object, and handle the new datagram.
158         
159         @type datagram: C{string}
160         @param datagram: the data received from the transport.
161         @type addr: (C{string}, C{int})
162         @param addr: source IP address and port of datagram.
163         """
164         c = self.connectionForAddr(addr)
165         c.datagramReceived(datagram, addr)
166         #if c.idle():
167         #    del self.connections[addr]
168
169     def connectionForAddr(self, addr):
170         """Get a protocol object for the source.
171         
172         @type addr: (C{string}, C{int})
173         @param addr: source IP address and port of datagram.
174         """
175         # Don't connect to ourself
176         if addr == self.addr:
177             raise KrcpError
178         
179         # Create a new protocol object if necessary
180         if not self.connections.has_key(addr):
181             conn = self.protocol(addr, self.server, self.stats, self.transport, self.config)
182             self.connections[addr] = conn
183         else:
184             conn = self.connections[addr]
185         return conn
186
187     def makeConnection(self, transport):
188         """Make a connection to a transport and save our address."""
189         protocol.DatagramProtocol.makeConnection(self, transport)
190         tup = transport.getHost()
191         self.addr = (tup.host, tup.port)
192         
193     def stopProtocol(self):
194         """Stop all the open connections."""
195         for conn in self.connections.values():
196             conn.stop()
197         protocol.DatagramProtocol.stopProtocol(self)
198
199 class KrpcRequest(defer.Deferred):
200     """An outstanding request to a remote node.
201     
202     @type protocol: L{KRPC}
203     @ivar protocol: the protocol to send data with
204     @ivar tid: the transaction ID of the request
205     @type method: C{string}
206     @ivar method: the name of the method to call on the remote node
207     @type data: C{string}
208     @ivar data: the message to send to the remote node
209     @type config: C{dictionary}
210     @ivar config: the configuration parameters for the DHT
211     @type delay: C{int}
212     @ivar delay: the last timeout delay sent
213     @type start: C{datetime}
214     @ivar start: the time to request was started at
215     @type later: L{twisted.internet.interfaces.IDelayedCall}
216     @ivar later: the pending call to timeout the last sent request
217     """
218     
219     def __init__(self, protocol, newTID, method, data, config):
220         """Initialize the request, and send it out.
221         
222         @type protocol: L{KRPC}
223         @param protocol: the protocol to send data with
224         @param newTID: the transaction ID of the request
225         @type method: C{string}
226         @param method: the name of the method to call on the remote node
227         @type data: C{string}
228         @param data: the message to send to the remote node
229         @type config: C{dictionary}
230         @param config: the configuration parameters for the DHT
231         """
232         defer.Deferred.__init__(self)
233         self.protocol = protocol
234         self.tid = newTID
235         self.method = method
236         self.data = data
237         self.config = config
238         self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
239         self.start = datetime.now()
240         self.later = None
241         reactor.callLater(0, self.send)
242         
243     def send(self):
244         """Send the request to the remote node."""
245         assert not self.later, 'There is already a pending request'
246         self.later = reactor.callLater(self.delay, self.timeOut)
247         try:
248             self.protocol.sendData(self.method, self.data)
249         except:
250             log.err()
251
252     def timeOut(self):
253         """Check for a unrecoverable timeout, otherwise resend."""
254         self.later = None
255         delay = datetime.now() - self.start
256         if delay > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 14)):
257             log.msg('%r timed out after %0.2f sec' %
258                     (self.tid, delay.seconds + delay.microseconds/1000000.0))
259             self.protocol.timeOut(self.tid, self.method)
260         elif self.protocol.stopped:
261             log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
262         else:
263             self.delay *= 2
264             log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
265             reactor.callLater(0, self.send)
266         
267     def callback(self, resp):
268         self.dropTimeOut()
269         defer.Deferred.callback(self, resp)
270         
271     def errback(self, resp):
272         self.dropTimeOut()
273         defer.Deferred.errback(self, resp)
274         
275     def dropTimeOut(self):
276         """Cancel the timeout call when a response is received."""
277         if self.later and self.later.active():
278             self.later.cancel()
279         self.later = None
280
281 class KRPC:
282     """The KRPC protocol implementation.
283     
284     @ivar transport: the transport to use for the protocol
285     @type factory: L{khashmir.Khashmir}
286     @ivar factory: the main Khashmir program
287     @type stats: L{stats.StatsLogger}
288     @ivar stats: the statistics logger to save transport info
289     @type addr: (C{string}, C{int})
290     @ivar addr: the IP address and port of the source node
291     @type config: C{dictionary}
292     @ivar config: the configuration parameters for the DHT
293     @type tids: C{dictionary}
294     @ivar tids: the transaction IDs outstanding for requests, keys are the
295         transaction ID of the request, values are the deferreds to call with
296         the results
297     @type stopped: C{boolean}
298     @ivar stopped: whether the protocol has been stopped
299     """
300     
301     def __init__(self, addr, server, stats, transport, config = {}):
302         """Initialize the protocol.
303         
304         @type addr: (C{string}, C{int})
305         @param addr: the IP address and port of the source node
306         @type server: L{khashmir.Khashmir}
307         @param server: the main Khashmir program
308         @type stats: L{stats.StatsLogger}
309         @param stats: the statistics logger to save transport info
310         @param transport: the transport to use for the protocol
311         @type config: C{dictionary}
312         @param config: the configuration parameters for the DHT
313             (optional, defaults to using defaults)
314         """
315         self.transport = transport
316         self.factory = server
317         self.stats = stats
318         self.addr = addr
319         self.config = config
320         self.tids = {}
321         self.stopped = False
322
323     def datagramReceived(self, data, addr):
324         """Process the new datagram.
325         
326         @type data: C{string}
327         @param data: the data received from the transport.
328         @type addr: (C{string}, C{int})
329         @param addr: source IP address and port of datagram.
330         """
331         self.stats.receivedBytes(len(data))
332         if self.stopped:
333             if self.config.get('SPEW', False):
334                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
335
336         # Bdecode the message
337         try:
338             msg = bdecode(data)
339         except Exception, e:
340             if self.config.get('SPEW', False):
341                 log.msg("krpc bdecode error: ")
342                 log.err(e)
343             return
344
345         # Make sure the remote node isn't trying anything funny
346         try:
347             verifyMessage(msg)
348         except Exception, e:
349             log.msg("krpc message verification error: ")
350             log.err(e)
351             return
352
353         if self.config.get('SPEW', False):
354             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
355
356         # Process it based on its type
357         if msg[TYP]  == REQ:
358             ilen = len(data)
359             
360             # Requests are handled by the factory
361             f = getattr(self.factory ,"krpc_" + msg[REQ], None)
362             msg[ARG]['_krpc_sender'] =  self.addr
363             if f and callable(f):
364                 self.stats.receivedAction(msg[REQ])
365                 try:
366                     ret = f(*(), **msg[ARG])
367                 except KrpcError, e:
368                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
369                     log.err(e)
370                     self.stats.errorAction(msg[REQ])
371                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
372                                               [e[0], e[1]])
373                 except TypeError, e:
374                     log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
375                     log.err(e)
376                     self.stats.errorAction(msg[REQ])
377                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
378                                               [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
379                 except Exception, e:
380                     log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
381                     log.err(e)
382                     self.stats.errorAction(msg[REQ])
383                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
384                                               [KRPC_ERROR_SERVER_ERROR, str(e)])
385                 else:
386                     olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
387             else:
388                 # Request for unknown method
389                 log.msg("ERROR: don't know about method %s" % msg[REQ])
390                 self.stats.receivedAction('unknown')
391                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
392                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
393
394             log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen))
395         elif msg[TYP] == RSP:
396             # Responses get processed by their TID's deferred
397             if self.tids.has_key(msg[TID]):
398                 req = self.tids[msg[TID]]
399                 #       callback
400                 del(self.tids[msg[TID]])
401                 msg[RSP]['_krpc_sender'] = addr
402                 req.callback(msg[RSP])
403             else:
404                 # no tid, this transaction was finished already...
405                 if self.config.get('SPEW', False):
406                     log.msg('received response from %r for completed request: %r' %
407                             (msg[RSP]['id'], msg[TID]))
408         elif msg[TYP] == ERR:
409             # Errors get processed by their TID's deferred's errback
410             if self.tids.has_key(msg[TID]):
411                 req = self.tids[msg[TID]]
412                 del(self.tids[msg[TID]])
413                 # callback
414                 req.errback(KrpcError(*msg[ERR]))
415             else:
416                 # no tid, this transaction was finished already...
417                 log.msg('received an error %r from %r for completed request: %r' %
418                         (msg[ERR], msg[RSP]['id'], msg[TID]))
419         else:
420             # Received an unknown message type
421             if self.config.get('SPEW', False):
422                 log.msg("unknown message type: %r" % msg)
423             if msg[TID] in self.tids:
424                 req = self.tids[msg[TID]]
425                 del(self.tids[msg[TID]])
426                 # callback
427                 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
428                                       "Received an unknown message type: %r" % msg[TYP]))
429                 
430     def _sendResponse(self, request, addr, tid, msgType, response):
431         """Helper function for sending responses to nodes.
432
433         @param request: the name of the requested method
434         @type addr: (C{string}, C{int})
435         @param addr: source IP address and port of datagram.
436         @param tid: the transaction ID of the request
437         @param msgType: the type of message to respond with
438         @param response: the arguments for the response
439         """
440         if not response:
441             response = {}
442         
443         try:
444             # Create the response message
445             msg = {TID : tid, TYP : msgType, msgType : response}
446     
447             if self.config.get('SPEW', False):
448                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
449     
450             out = bencode(msg)
451             
452             # Make sure its not too long
453             if len(out) > UDP_PACKET_LIMIT:
454                 # Can we remove some values to shorten it?
455                 if 'values' in response:
456                     # Save the original list of values
457                     orig_values = response['values']
458                     len_orig_values = len(bencode(orig_values))
459                     
460                     # Caclulate the maximum value length possible
461                     max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
462                     assert max_len_values > 0
463                     
464                     # Start with a calculation of how many values should be included
465                     # (assumes all values are the same length)
466                     per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
467                     num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
468     
469                     # Do a linear search for the actual maximum number possible
470                     bencoded_values = len(bencode(orig_values[:num_values]))
471                     while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
472                         bencoded_values += len(bencode(orig_values[num_values]))
473                         num_values += 1
474                     while bencoded_values > max_len_values and num_values > 0:
475                         num_values -= 1
476                         bencoded_values -= len(bencode(orig_values[num_values]))
477                     assert num_values > 0
478     
479                     # Encode the result
480                     response['values'] = orig_values[:num_values]
481                     out = bencode(msg)
482                     assert len(out) < UDP_PACKET_LIMIT
483                     log.msg('Shortened a long packet from %d to %d values, new packet length: %d' % 
484                             (len(orig_values), num_values, len(out)))
485                 else:
486                     # Too long a response, send an error
487                     log.msg('Could not send response, too long: %d bytes' % len(out))
488                     self.stats.errorAction(request)
489                     msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
490                     out = bencode(msg)
491
492         except Exception, e:
493             # Unknown error, send an error message
494             self.stats.errorAction(request)
495             msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
496             out = bencode(msg)
497                     
498         self.stats.sentBytes(len(out))
499         self.transport.write(out, addr)
500         return len(out)
501     
502     def sendRequest(self, method, args):
503         """Send a request to the remote node.
504         
505         @type method: C{string}
506         @param method: the method name to call on the remote node
507         @param args: the arguments to send to the remote node's method
508         """
509         if self.stopped:
510             return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
511                                         "cannot send, connection has been stopped"))
512
513         # Create the request message
514         newTID = newID()
515         msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
516         if self.config.get('SPEW', False):
517             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
518         data = bencode(msg)
519         
520         # Create the request object and save it with the TID
521         req = KrpcRequest(self, newTID, method, data, self.config)
522         self.tids[newTID] = req
523         
524         # Save the conclusion of the action
525         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
526                          callbackArgs = (method, datetime.now()),
527                          errbackArgs = (method, datetime.now()))
528
529         return req
530     
531     def sendData(self, method, data):
532         """Write a request to the transport and save the stats.
533         
534         @type method: C{string}
535         @param method: the name of the method to call on the remote node
536         @type data: C{string}
537         @param data: the message to send to the remote node
538         """
539         self.transport.write(data, self.addr)
540         self.stats.sentAction(method)
541         self.stats.sentBytes(len(data))
542         
543     def timeOut(self, badTID, method):
544         """Call the deferred's errback if a timeout occurs.
545         
546         @param badTID: the transaction ID of the request
547         @type method: C{string}
548         @param method: the name of the method that timed out on the remote node
549         """
550         if badTID in self.tids:
551             req = self.tids[badTID]
552             del(self.tids[badTID])
553             req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % 
554                                                       (method, self.addr)))
555         else:
556             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
557         
558     def stop(self):
559         """Cancel all pending requests."""
560         for req in self.tids.values():
561             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
562                                   'connection has been stopped while waiting for response'))
563         self.tids = {}
564         self.stopped = True
565
566 #{ For testing the KRPC protocol
567 def connectionForAddr(host, port):
568     return host
569     
570 class Receiver(protocol.Factory):
571     protocol = KRPC
572     def __init__(self):
573         self.buf = []
574     def krpc_store(self, msg, _krpc_sender):
575         self.buf += [msg]
576         return {}
577     def krpc_echo(self, msg, _krpc_sender):
578         return {'msg': msg}
579     def krpc_values(self, length, num, _krpc_sender):
580         return {'values': ['1'*length]*num}
581
582 def make(port):
583     from stats import StatsLogger
584     af = Receiver()
585     a = hostbroker(af, StatsLogger(None, None),
586                    {'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
587     a.protocol = KRPC
588     p = reactor.listenUDP(port, a)
589     return af, a, p
590     
591 class KRPCTests(unittest.TestCase):
592     timeout = 2
593     
594     def setUp(self):
595         self.af, self.a, self.ap = make(1180)
596         self.bf, self.b, self.bp = make(1181)
597
598     def tearDown(self):
599         self.ap.stopListening()
600         self.bp.stopListening()
601
602     def bufEquals(self, result, value):
603         self.failUnlessEqual(self.bf.buf, value)
604
605     def testSimpleMessage(self):
606         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
607         d.addCallback(self.bufEquals, ["This is a test."])
608         return d
609
610     def testMessageBlast(self):
611         for i in range(100):
612             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
613         d.addCallback(self.bufEquals, ["This is a test."] * 100)
614         return d
615
616     def testEcho(self):
617         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
618         df.addCallback(self.gotMsg, "This is a test.")
619         return df
620
621     def gotMsg(self, dict, should_be):
622         _krpc_sender = dict['_krpc_sender']
623         self.failUnlessEqual(dict['msg'], should_be)
624
625     def testManyEcho(self):
626         for i in xrange(100):
627             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
628             df.addCallback(self.gotMsg, "This is a test.")
629         return df
630
631     def testMultiEcho(self):
632         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
633         df.addCallback(self.gotMsg, "This is a test.")
634
635         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
636         df.addCallback(self.gotMsg, "This is another test.")
637
638         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
639         df.addCallback(self.gotMsg, "This is yet another test.")
640         
641         return df
642
643     def testEchoReset(self):
644         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
645         df.addCallback(self.gotMsg, "This is a test.")
646
647         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
648         df.addCallback(self.gotMsg, "This is another test.")
649         df.addCallback(self.echoReset)
650         return df
651     
652     def echoReset(self, dict):
653         del(self.a.connections[('127.0.0.1', 1181)])
654         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
655         df.addCallback(self.gotMsg, "This is yet another test.")
656         return df
657
658     def testUnknownMeth(self):
659         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
660         df = self.failUnlessFailure(df, KrpcError)
661         df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
662         return df
663
664     def testMalformedRequest(self):
665         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
666         df = self.failUnlessFailure(df, KrpcError)
667         df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
668         return df
669
670     def gotErr(self, value, should_be, *errorTypes):
671         self.failUnlessEqual(value[0], should_be)
672         if errorTypes:
673             self.flushLoggedErrors(*errorTypes)
674         
675     def testLongPackets(self):
676         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
677         df.addCallback(self.gotLongRsp)
678         return df
679
680     def gotLongRsp(self, dict):
681         # Not quite accurate, but good enough
682         self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
683