]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_p2p_Khashmir/krpc.py
c3dcfb69a57dd04cf5ba2f7b2e69d0aeec80f62b
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
1
2 """The KRPC communication protocol implementation.
3
4 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
5     UDP packet without fragmentation
6
7 @var KRPC_ERROR: the code for a generic error
8 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
9 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
10 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
11 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
12 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
13 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
14
15 @var KRPC_ERROR_INTERNAL: the code for an internal error
16 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
17 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
18 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
19
20 @var TID: the identifier for the transaction ID
21 @var REQ: the identifier for a request packet
22 @var RSP: the identifier for a response packet
23 @var TYP: the identifier for the type of packet
24 @var ARG: the identifier for the argument to the request
25 @var ERR: the identifier for an error packet
26
27 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
28     KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
29     KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
30     KRPC_ERROR_RESPONSE_TOO_LONG
31 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
32     KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
33 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
34
35 """
36
37 from bencode import bencode, bdecode
38 from datetime import datetime, timedelta
39 from math import ceil
40
41 from twisted.internet import defer
42 from twisted.internet import protocol, reactor
43 from twisted.python import log
44 from twisted.trial import unittest
45
46 from khash import newID
47
48 UDP_PACKET_LIMIT = 1472
49
50 # Remote node errors
51 KRPC_ERROR = 200
52 KRPC_ERROR_SERVER_ERROR = 201
53 KRPC_ERROR_MALFORMED_PACKET = 202
54 KRPC_ERROR_METHOD_UNKNOWN = 203
55 KRPC_ERROR_MALFORMED_REQUEST = 204
56 KRPC_ERROR_INVALID_TOKEN = 205
57 KRPC_ERROR_RESPONSE_TOO_LONG = 206
58
59 # Local errors
60 KRPC_ERROR_INTERNAL = 100
61 KRPC_ERROR_RECEIVED_UNKNOWN = 101
62 KRPC_ERROR_TIMEOUT = 102
63 KRPC_ERROR_PROTOCOL_STOPPED = 103
64
65 # commands
66 TID = 't'
67 REQ = 'q'
68 RSP = 'r'
69 TYP = 'y'
70 ARG = 'a'
71 ERR = 'e'
72
73 class KrpcError(Exception):
74     """An error occurred in the KRPC protocol."""
75     pass
76
77 def verifyMessage(msg):
78     """Check received message for corruption and errors.
79     
80     @type msg: C{dictionary}
81     @param msg: the dictionary of information received on the connection
82     @raise KrpcError: if the message is corrupt
83     """
84     
85     if type(msg) != dict:
86         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
87     if TYP not in msg:
88         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
89     if msg[TYP] == REQ:
90         if REQ not in msg:
91             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
92         if type(msg[REQ]) != str:
93             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
94         if ARG not in msg:
95             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
96         if type(msg[ARG]) != dict:
97             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
98     elif msg[TYP] == RSP:
99         if RSP not in msg:
100             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
101         if type(msg[RSP]) != dict:
102             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
103     elif msg[TYP] == ERR:
104         if ERR not in msg:
105             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
106         if type(msg[ERR]) != list:
107             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
108         if len(msg[ERR]) != 2:
109             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
110         if type(msg[ERR][0]) not in (int, long):
111             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
112         if type(msg[ERR][1]) != str:
113             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
114 #    else:
115 #        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
116     if TID not in msg:
117         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
118     if type(msg[TID]) != str:
119         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
120
121 class hostbroker(protocol.DatagramProtocol):
122     """The factory for the KRPC protocol.
123     
124     @type server: L{khashmir.Khashmir}
125     @ivar server: the main Khashmir program
126     @type stats: L{stats.StatsLogger}
127     @ivar stats: the statistics logger to save transport info
128     @type config: C{dictionary}
129     @ivar config: the configuration parameters for the DHT
130     @type connections: C{dictionary}
131     @ivar connections: all the connections that have ever been made to the
132         protocol, keys are IP address and port pairs, values are L{KRPC}
133         protocols for the addresses
134     @ivar protocol: the protocol to use to handle incoming connections
135         (added externally)
136     @type addr: (C{string}, C{int})
137     @ivar addr: the IP address and port of this node
138     """
139     
140     def __init__(self, server, stats, config):
141         """Initialize the factory.
142         
143         @type server: L{khashmir.Khashmir}
144         @param server: the main DHT program
145         @type stats: L{stats.StatsLogger}
146         @param stats: the statistics logger to save transport info
147         @type config: C{dictionary}
148         @param config: the configuration parameters for the DHT
149         """
150         self.server = server
151         self.stats = stats
152         self.config = config
153         # this should be changed to storage that drops old entries
154         self.connections = {}
155         
156     def datagramReceived(self, datagram, addr):
157         """Optionally create a new protocol object, and handle the new datagram.
158         
159         @type datagram: C{string}
160         @param datagram: the data received from the transport.
161         @type addr: (C{string}, C{int})
162         @param addr: source IP address and port of datagram.
163         """
164         c = self.connectionForAddr(addr)
165         c.datagramReceived(datagram, addr)
166         #if c.idle():
167         #    del self.connections[addr]
168
169     def connectionForAddr(self, addr):
170         """Get a protocol object for the source.
171         
172         @type addr: (C{string}, C{int})
173         @param addr: source IP address and port of datagram.
174         """
175         # Don't connect to ourself
176         if addr == self.addr:
177             raise KrcpError
178         
179         # Create a new protocol object if necessary
180         if not self.connections.has_key(addr):
181             conn = self.protocol(addr, self.server, self.stats, self.transport, self.config)
182             self.connections[addr] = conn
183         else:
184             conn = self.connections[addr]
185         return conn
186
187     def makeConnection(self, transport):
188         """Make a connection to a transport and save our address."""
189         protocol.DatagramProtocol.makeConnection(self, transport)
190         tup = transport.getHost()
191         self.addr = (tup.host, tup.port)
192         
193     def stopProtocol(self):
194         """Stop all the open connections."""
195         for conn in self.connections.values():
196             conn.stop()
197         protocol.DatagramProtocol.stopProtocol(self)
198
199 class KrpcRequest(defer.Deferred):
200     """An outstanding request to a remote node.
201     
202     @type protocol: L{KRPC}
203     @ivar protocol: the protocol to send data with
204     @ivar tid: the transaction ID of the request
205     @type method: C{string}
206     @ivar method: the name of the method to call on the remote node
207     @type data: C{string}
208     @ivar data: the message to send to the remote node
209     @type config: C{dictionary}
210     @ivar config: the configuration parameters for the DHT
211     @type delay: C{int}
212     @ivar delay: the last timeout delay sent
213     @type start: C{datetime}
214     @ivar start: the time to request was started at
215     @type laterNextTimeout: L{twisted.internet.interfaces.IDelayedCall}
216     @ivar laterNextTimeout: the pending call to timeout the last sent request
217     @type laterFinalTimeout: L{twisted.internet.interfaces.IDelayedCall}
218     @ivar laterFinalTimeout: the pending call to timeout the entire request
219     """
220     
221     def __init__(self, protocol, newTID, method, data, config):
222         """Initialize the request, and send it out.
223         
224         @type protocol: L{KRPC}
225         @param protocol: the protocol to send data with
226         @param newTID: the transaction ID of the request
227         @type method: C{string}
228         @param method: the name of the method to call on the remote node
229         @type data: C{string}
230         @param data: the message to send to the remote node
231         @type config: C{dictionary}
232         @param config: the configuration parameters for the DHT
233         """
234         defer.Deferred.__init__(self)
235         self.protocol = protocol
236         self.tid = newTID
237         self.method = method
238         self.data = data
239         self.config = config
240         self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
241         self.start = datetime.now()
242         self.laterNextTimeout = None
243         self.laterFinalTimeout = reactor.callLater(self.config.get('KRPC_TIMEOUT', 9), self.finalTimeout)
244         reactor.callLater(0, self.send)
245         
246     def send(self):
247         """Send the request to the remote node."""
248         assert not self.laterNextTimeout, 'There is already a pending request'
249         self.laterNextTimeout = reactor.callLater(self.delay, self.nextTimeout)
250         try:
251             self.protocol.sendData(self.method, self.data)
252         except:
253             log.err()
254
255     def nextTimeout(self):
256         """Check for a unrecoverable timeout, otherwise resend."""
257         self.laterNextTimeout = None
258         if datetime.now() - self.start > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 9)):
259             self.finalTimeout()
260         elif self.protocol.stopped:
261             log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
262         else:
263             self.delay *= 2
264             log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
265             reactor.callLater(0, self.send)
266         
267     def finalTimeout(self):
268         """Timeout the request after an unrecoverable timeout."""
269         self.dropTimeOut()
270         delay = datetime.now() - self.start
271         log.msg('%r timed out after %0.2f sec' %
272                 (self.tid, delay.seconds + delay.microseconds/1000000.0))
273         self.protocol.timeOut(self.tid, self.method)
274         
275     def callback(self, resp):
276         self.dropTimeOut()
277         defer.Deferred.callback(self, resp)
278         
279     def errback(self, resp):
280         self.dropTimeOut()
281         defer.Deferred.errback(self, resp)
282         
283     def dropTimeOut(self):
284         """Cancel the timeout call when a response is received."""
285         if self.laterFinalTimeout and self.laterFinalTimeout.active():
286             self.laterFinalTimeout.cancel()
287         self.laterFinalTimeout = None
288         if self.laterNextTimeout and self.laterNextTimeout.active():
289             self.laterNextTimeout.cancel()
290         self.laterNextTimeout = None
291
292 class KRPC:
293     """The KRPC protocol implementation.
294     
295     @ivar transport: the transport to use for the protocol
296     @type factory: L{khashmir.Khashmir}
297     @ivar factory: the main Khashmir program
298     @type stats: L{stats.StatsLogger}
299     @ivar stats: the statistics logger to save transport info
300     @type addr: (C{string}, C{int})
301     @ivar addr: the IP address and port of the source node
302     @type config: C{dictionary}
303     @ivar config: the configuration parameters for the DHT
304     @type tids: C{dictionary}
305     @ivar tids: the transaction IDs outstanding for requests, keys are the
306         transaction ID of the request, values are the deferreds to call with
307         the results
308     @type stopped: C{boolean}
309     @ivar stopped: whether the protocol has been stopped
310     """
311     
312     def __init__(self, addr, server, stats, transport, config = {}):
313         """Initialize the protocol.
314         
315         @type addr: (C{string}, C{int})
316         @param addr: the IP address and port of the source node
317         @type server: L{khashmir.Khashmir}
318         @param server: the main Khashmir program
319         @type stats: L{stats.StatsLogger}
320         @param stats: the statistics logger to save transport info
321         @param transport: the transport to use for the protocol
322         @type config: C{dictionary}
323         @param config: the configuration parameters for the DHT
324             (optional, defaults to using defaults)
325         """
326         self.transport = transport
327         self.factory = server
328         self.stats = stats
329         self.addr = addr
330         self.config = config
331         self.tids = {}
332         self.stopped = False
333
334     def datagramReceived(self, data, addr):
335         """Process the new datagram.
336         
337         @type data: C{string}
338         @param data: the data received from the transport.
339         @type addr: (C{string}, C{int})
340         @param addr: source IP address and port of datagram.
341         """
342         self.stats.receivedBytes(len(data))
343         if self.stopped:
344             if self.config.get('SPEW', False):
345                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
346
347         # Bdecode the message
348         try:
349             msg = bdecode(data)
350         except Exception, e:
351             if self.config.get('SPEW', False):
352                 log.msg("krpc bdecode error: ")
353                 log.err(e)
354             return
355
356         # Make sure the remote node isn't trying anything funny
357         try:
358             verifyMessage(msg)
359         except Exception, e:
360             log.msg("krpc message verification error: ")
361             log.err(e)
362             return
363
364         if self.config.get('SPEW', False):
365             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
366
367         # Process it based on its type
368         if msg[TYP]  == REQ:
369             ilen = len(data)
370             
371             # Requests are handled by the factory
372             f = getattr(self.factory ,"krpc_" + msg[REQ], None)
373             msg[ARG]['_krpc_sender'] =  self.addr
374             if f and callable(f):
375                 self.stats.receivedAction(msg[REQ])
376                 try:
377                     ret = f(*(), **msg[ARG])
378                 except KrpcError, e:
379                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
380                     if e[0] != KRPC_ERROR_INVALID_TOKEN:
381                         log.err(e)
382                     else:
383                         log.msg('Node sent us an invalid token, not storing')
384                     self.stats.errorAction(msg[REQ])
385                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
386                                               [e[0], e[1]])
387                 except TypeError, e:
388                     log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
389                     log.err(e)
390                     self.stats.errorAction(msg[REQ])
391                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
392                                               [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
393                 except Exception, e:
394                     log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
395                     log.err(e)
396                     self.stats.errorAction(msg[REQ])
397                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
398                                               [KRPC_ERROR_SERVER_ERROR, str(e)])
399                 else:
400                     olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
401             else:
402                 # Request for unknown method
403                 log.msg("ERROR: don't know about method %s" % msg[REQ])
404                 self.stats.receivedAction('unknown')
405                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
406                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
407
408             log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen))
409         elif msg[TYP] == RSP:
410             # Responses get processed by their TID's deferred
411             if self.tids.has_key(msg[TID]):
412                 req = self.tids[msg[TID]]
413                 #       callback
414                 del(self.tids[msg[TID]])
415                 msg[RSP]['_krpc_sender'] = addr
416                 req.callback(msg[RSP])
417             else:
418                 # no tid, this transaction was finished already...
419                 if self.config.get('SPEW', False):
420                     log.msg('received response from %r for completed request: %r' %
421                             (msg[RSP]['id'], msg[TID]))
422         elif msg[TYP] == ERR:
423             # Errors get processed by their TID's deferred's errback
424             if self.tids.has_key(msg[TID]):
425                 req = self.tids[msg[TID]]
426                 del(self.tids[msg[TID]])
427                 # callback
428                 req.errback(KrpcError(*msg[ERR]))
429             else:
430                 # no tid, this transaction was finished already...
431                 log.msg('received an error %r from %r for completed request: %r' %
432                         (msg[ERR], addr, msg[TID]))
433         else:
434             # Received an unknown message type
435             if self.config.get('SPEW', False):
436                 log.msg("unknown message type: %r" % msg)
437             if msg[TID] in self.tids:
438                 req = self.tids[msg[TID]]
439                 del(self.tids[msg[TID]])
440                 # callback
441                 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
442                                       "Received an unknown message type: %r" % msg[TYP]))
443                 
444     def _sendResponse(self, request, addr, tid, msgType, response):
445         """Helper function for sending responses to nodes.
446
447         @param request: the name of the requested method
448         @type addr: (C{string}, C{int})
449         @param addr: source IP address and port of datagram.
450         @param tid: the transaction ID of the request
451         @param msgType: the type of message to respond with
452         @param response: the arguments for the response
453         """
454         if not response:
455             response = {}
456         
457         try:
458             # Create the response message
459             msg = {TID : tid, TYP : msgType, msgType : response}
460     
461             if self.config.get('SPEW', False):
462                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
463     
464             out = bencode(msg)
465             
466             # Make sure its not too long
467             if len(out) > UDP_PACKET_LIMIT:
468                 # Can we remove some values to shorten it?
469                 if 'values' in response:
470                     # Save the original list of values
471                     orig_values = response['values']
472                     len_orig_values = len(bencode(orig_values))
473                     
474                     # Caclulate the maximum value length possible
475                     max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
476                     assert max_len_values > 0
477                     
478                     # Start with a calculation of how many values should be included
479                     # (assumes all values are the same length)
480                     per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
481                     num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
482     
483                     # Do a linear search for the actual maximum number possible
484                     bencoded_values = len(bencode(orig_values[:num_values]))
485                     while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
486                         bencoded_values += len(bencode(orig_values[num_values]))
487                         num_values += 1
488                     while bencoded_values > max_len_values and num_values > 0:
489                         num_values -= 1
490                         bencoded_values -= len(bencode(orig_values[num_values]))
491                     assert num_values > 0
492     
493                     # Encode the result
494                     response['values'] = orig_values[:num_values]
495                     out = bencode(msg)
496                     assert len(out) < UDP_PACKET_LIMIT
497                     log.msg('Shortened a long packet from %d to %d values, new packet length: %d' % 
498                             (len(orig_values), num_values, len(out)))
499                 else:
500                     # Too long a response, send an error
501                     log.msg('Could not send response, too long: %d bytes' % len(out))
502                     self.stats.errorAction(request)
503                     msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
504                     out = bencode(msg)
505
506         except Exception, e:
507             # Unknown error, send an error message
508             self.stats.errorAction(request)
509             msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
510             out = bencode(msg)
511                     
512         self.stats.sentBytes(len(out))
513         self.transport.write(out, addr)
514         return len(out)
515     
516     def sendRequest(self, method, args):
517         """Send a request to the remote node.
518         
519         @type method: C{string}
520         @param method: the method name to call on the remote node
521         @param args: the arguments to send to the remote node's method
522         """
523         if self.stopped:
524             return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
525                                         "cannot send, connection has been stopped"))
526
527         # Create the request message
528         newTID = newID()
529         msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
530         if self.config.get('SPEW', False):
531             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
532         data = bencode(msg)
533         
534         # Create the request object and save it with the TID
535         req = KrpcRequest(self, newTID, method, data, self.config)
536         self.tids[newTID] = req
537         
538         # Save the conclusion of the action
539         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
540                          callbackArgs = (method, datetime.now()),
541                          errbackArgs = (method, datetime.now()))
542
543         return req
544     
545     def sendData(self, method, data):
546         """Write a request to the transport and save the stats.
547         
548         @type method: C{string}
549         @param method: the name of the method to call on the remote node
550         @type data: C{string}
551         @param data: the message to send to the remote node
552         """
553         self.transport.write(data, self.addr)
554         self.stats.sentAction(method)
555         self.stats.sentBytes(len(data))
556         
557     def timeOut(self, badTID, method):
558         """Call the deferred's errback if a timeout occurs.
559         
560         @param badTID: the transaction ID of the request
561         @type method: C{string}
562         @param method: the name of the method that timed out on the remote node
563         """
564         if badTID in self.tids:
565             req = self.tids[badTID]
566             del(self.tids[badTID])
567             req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % 
568                                                       (method, self.addr)))
569         else:
570             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
571         
572     def stop(self):
573         """Cancel all pending requests."""
574         for req in self.tids.values():
575             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
576                                   'connection has been stopped while waiting for response'))
577         self.tids = {}
578         self.stopped = True
579
580 #{ For testing the KRPC protocol
581 def connectionForAddr(host, port):
582     return host
583     
584 class Receiver(protocol.Factory):
585     protocol = KRPC
586     def __init__(self):
587         self.buf = []
588     def krpc_store(self, msg, _krpc_sender):
589         self.buf += [msg]
590         return {}
591     def krpc_echo(self, msg, _krpc_sender):
592         return {'msg': msg}
593     def krpc_values(self, length, num, _krpc_sender):
594         return {'values': ['1'*length]*num}
595
596 def make(port):
597     from stats import StatsLogger
598     af = Receiver()
599     a = hostbroker(af, StatsLogger(None, None),
600                    {'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
601     a.protocol = KRPC
602     p = reactor.listenUDP(port, a)
603     return af, a, p
604     
605 class KRPCTests(unittest.TestCase):
606     timeout = 2
607     
608     def setUp(self):
609         self.af, self.a, self.ap = make(1180)
610         self.bf, self.b, self.bp = make(1181)
611
612     def tearDown(self):
613         self.ap.stopListening()
614         self.bp.stopListening()
615
616     def bufEquals(self, result, value):
617         self.failUnlessEqual(self.bf.buf, value)
618
619     def testSimpleMessage(self):
620         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
621         d.addCallback(self.bufEquals, ["This is a test."])
622         return d
623
624     def testMessageBlast(self):
625         for i in range(100):
626             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
627         d.addCallback(self.bufEquals, ["This is a test."] * 100)
628         return d
629
630     def testEcho(self):
631         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
632         df.addCallback(self.gotMsg, "This is a test.")
633         return df
634
635     def gotMsg(self, dict, should_be):
636         _krpc_sender = dict['_krpc_sender']
637         self.failUnlessEqual(dict['msg'], should_be)
638
639     def testManyEcho(self):
640         for i in xrange(100):
641             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
642             df.addCallback(self.gotMsg, "This is a test.")
643         return df
644
645     def testMultiEcho(self):
646         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
647         df.addCallback(self.gotMsg, "This is a test.")
648
649         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
650         df.addCallback(self.gotMsg, "This is another test.")
651
652         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
653         df.addCallback(self.gotMsg, "This is yet another test.")
654         
655         return df
656
657     def testEchoReset(self):
658         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
659         df.addCallback(self.gotMsg, "This is a test.")
660
661         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
662         df.addCallback(self.gotMsg, "This is another test.")
663         df.addCallback(self.echoReset)
664         return df
665     
666     def echoReset(self, dict):
667         del(self.a.connections[('127.0.0.1', 1181)])
668         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
669         df.addCallback(self.gotMsg, "This is yet another test.")
670         return df
671
672     def testUnknownMeth(self):
673         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
674         df = self.failUnlessFailure(df, KrpcError)
675         df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
676         return df
677
678     def testMalformedRequest(self):
679         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
680         df = self.failUnlessFailure(df, KrpcError)
681         df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
682         return df
683
684     def gotErr(self, value, should_be, *errorTypes):
685         self.failUnlessEqual(value[0], should_be)
686         if errorTypes:
687             self.flushLoggedErrors(*errorTypes)
688         
689     def testLongPackets(self):
690         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
691         df.addCallback(self.gotLongRsp)
692         return df
693
694     def gotLongRsp(self, dict):
695         # Not quite accurate, but good enough
696         self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
697