New TODO for better total number of nodes calculation.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
1
2 """The KRPC communication protocol implementation.
3
4 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
5     UDP packet without fragmentation
6
7 @var KRPC_ERROR: the code for a generic error
8 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
9 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
10 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
11 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
12 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
13 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
14
15 @var KRPC_ERROR_INTERNAL: the code for an internal error
16 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
17 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
18 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
19
20 @var TID: the identifier for the transaction ID
21 @var REQ: the identifier for a request packet
22 @var RSP: the identifier for a response packet
23 @var TYP: the identifier for the type of packet
24 @var ARG: the identifier for the argument to the request
25 @var ERR: the identifier for an error packet
26
27 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
28     KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
29     KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
30     KRPC_ERROR_RESPONSE_TOO_LONG
31 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
32     KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
33 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
34
35 """
36
37 from bencode import bencode, bdecode
38 from datetime import datetime, timedelta
39 from math import ceil
40
41 from twisted.internet import defer
42 from twisted.internet import protocol, reactor
43 from twisted.python import log
44 from twisted.trial import unittest
45
46 from khash import newID
47
48 UDP_PACKET_LIMIT = 1472
49
50 # Remote node errors
51 KRPC_ERROR = 200
52 KRPC_ERROR_SERVER_ERROR = 201
53 KRPC_ERROR_MALFORMED_PACKET = 202
54 KRPC_ERROR_METHOD_UNKNOWN = 203
55 KRPC_ERROR_MALFORMED_REQUEST = 204
56 KRPC_ERROR_INVALID_TOKEN = 205
57 KRPC_ERROR_RESPONSE_TOO_LONG = 206
58
59 # Local errors
60 KRPC_ERROR_INTERNAL = 100
61 KRPC_ERROR_RECEIVED_UNKNOWN = 101
62 KRPC_ERROR_TIMEOUT = 102
63 KRPC_ERROR_PROTOCOL_STOPPED = 103
64
65 # commands
66 TID = 't'
67 REQ = 'q'
68 RSP = 'r'
69 TYP = 'y'
70 ARG = 'a'
71 ERR = 'e'
72
73 class KrpcError(Exception):
74     """An error occurred in the KRPC protocol."""
75     pass
76
77 def verifyMessage(msg):
78     """Check received message for corruption and errors.
79     
80     @type msg: C{dictionary}
81     @param msg: the dictionary of information received on the connection
82     @raise KrpcError: if the message is corrupt
83     """
84     
85     if type(msg) != dict:
86         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
87     if TYP not in msg:
88         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
89     if msg[TYP] == REQ:
90         if REQ not in msg:
91             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
92         if type(msg[REQ]) != str:
93             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
94         if ARG not in msg:
95             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
96         if type(msg[ARG]) != dict:
97             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
98     elif msg[TYP] == RSP:
99         if RSP not in msg:
100             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
101         if type(msg[RSP]) != dict:
102             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
103     elif msg[TYP] == ERR:
104         if ERR not in msg:
105             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
106         if type(msg[ERR]) != list:
107             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
108         if len(msg[ERR]) != 2:
109             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
110         if type(msg[ERR][0]) not in (int, long):
111             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
112         if type(msg[ERR][1]) != str:
113             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
114 #    else:
115 #        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
116     if TID not in msg:
117         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
118     if type(msg[TID]) != str:
119         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
120
121 class hostbroker(protocol.DatagramProtocol):
122     """The factory for the KRPC protocol.
123     
124     @type server: L{khashmir.Khashmir}
125     @ivar server: the main Khashmir program
126     @type stats: L{stats.StatsLogger}
127     @ivar stats: the statistics logger to save transport info
128     @type config: C{dictionary}
129     @ivar config: the configuration parameters for the DHT
130     @type connections: C{dictionary}
131     @ivar connections: all the connections that have ever been made to the
132         protocol, keys are IP address and port pairs, values are L{KRPC}
133         protocols for the addresses
134     @ivar protocol: the protocol to use to handle incoming connections
135         (added externally)
136     @type addr: (C{string}, C{int})
137     @ivar addr: the IP address and port of this node
138     """
139     
140     def __init__(self, server, stats, config):
141         """Initialize the factory.
142         
143         @type server: L{khashmir.Khashmir}
144         @param server: the main DHT program
145         @type stats: L{stats.StatsLogger}
146         @param stats: the statistics logger to save transport info
147         @type config: C{dictionary}
148         @param config: the configuration parameters for the DHT
149         """
150         self.server = server
151         self.stats = stats
152         self.config = config
153         # this should be changed to storage that drops old entries
154         self.connections = {}
155         
156     def datagramReceived(self, datagram, addr):
157         """Optionally create a new protocol object, and handle the new datagram.
158         
159         @type datagram: C{string}
160         @param datagram: the data received from the transport.
161         @type addr: (C{string}, C{int})
162         @param addr: source IP address and port of datagram.
163         """
164         c = self.connectionForAddr(addr)
165         c.datagramReceived(datagram, addr)
166         #if c.idle():
167         #    del self.connections[addr]
168
169     def connectionForAddr(self, addr):
170         """Get a protocol object for the source.
171         
172         @type addr: (C{string}, C{int})
173         @param addr: source IP address and port of datagram.
174         """
175         # Don't connect to ourself
176         if addr == self.addr:
177             raise KrcpError
178         
179         # Create a new protocol object if necessary
180         if not self.connections.has_key(addr):
181             conn = self.protocol(addr, self.server, self.stats, self.transport, self.config)
182             self.connections[addr] = conn
183         else:
184             conn = self.connections[addr]
185         return conn
186
187     def makeConnection(self, transport):
188         """Make a connection to a transport and save our address."""
189         protocol.DatagramProtocol.makeConnection(self, transport)
190         tup = transport.getHost()
191         self.addr = (tup.host, tup.port)
192         
193     def stopProtocol(self):
194         """Stop all the open connections."""
195         for conn in self.connections.values():
196             conn.stop()
197         protocol.DatagramProtocol.stopProtocol(self)
198
199 class KrpcRequest(defer.Deferred):
200     """An outstanding request to a remote node.
201     
202     @type protocol: L{KRPC}
203     @ivar protocol: the protocol to send data with
204     @ivar tid: the transaction ID of the request
205     @type method: C{string}
206     @ivar method: the name of the method to call on the remote node
207     @type data: C{string}
208     @ivar data: the message to send to the remote node
209     @type config: C{dictionary}
210     @ivar config: the configuration parameters for the DHT
211     @type delay: C{int}
212     @ivar delay: the last timeout delay sent
213     @type start: C{datetime}
214     @ivar start: the time to request was started at
215     @type laterNextTimeout: L{twisted.internet.interfaces.IDelayedCall}
216     @ivar laterNextTimeout: the pending call to timeout the last sent request
217     @type laterFinalTimeout: L{twisted.internet.interfaces.IDelayedCall}
218     @ivar laterFinalTimeout: the pending call to timeout the entire request
219     """
220     
221     def __init__(self, protocol, newTID, method, data, config):
222         """Initialize the request, and send it out.
223         
224         @type protocol: L{KRPC}
225         @param protocol: the protocol to send data with
226         @param newTID: the transaction ID of the request
227         @type method: C{string}
228         @param method: the name of the method to call on the remote node
229         @type data: C{string}
230         @param data: the message to send to the remote node
231         @type config: C{dictionary}
232         @param config: the configuration parameters for the DHT
233         """
234         defer.Deferred.__init__(self)
235         self.protocol = protocol
236         self.tid = newTID
237         self.method = method
238         self.data = data
239         self.config = config
240         self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
241         self.start = datetime.now()
242         self.laterNextTimeout = None
243         self.laterFinalTimeout = reactor.callLater(self.config.get('KRPC_TIMEOUT', 9), self.finalTimeout)
244         reactor.callLater(0, self.send)
245         
246     def send(self):
247         """Send the request to the remote node."""
248         assert not self.laterNextTimeout, 'There is already a pending request'
249         self.laterNextTimeout = reactor.callLater(self.delay, self.nextTimeout)
250         try:
251             self.protocol.sendData(self.method, self.data)
252         except:
253             log.err()
254
255     def nextTimeout(self):
256         """Check for a unrecoverable timeout, otherwise resend."""
257         self.laterNextTimeout = None
258         if datetime.now() - self.start > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 9)):
259             self.finalTimeout()
260         elif self.protocol.stopped:
261             log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
262         else:
263             self.delay *= 2
264             log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
265             reactor.callLater(0, self.send)
266         
267     def finalTimeout(self):
268         """Timeout the request after an unrecoverable timeout."""
269         self.dropTimeOut()
270         delay = datetime.now() - self.start
271         log.msg('%r timed out after %0.2f sec' %
272                 (self.tid, delay.seconds + delay.microseconds/1000000.0))
273         self.protocol.timeOut(self.tid, self.method)
274         
275     def callback(self, resp):
276         self.dropTimeOut()
277         defer.Deferred.callback(self, resp)
278         
279     def errback(self, resp):
280         self.dropTimeOut()
281         defer.Deferred.errback(self, resp)
282         
283     def dropTimeOut(self):
284         """Cancel the timeout call when a response is received."""
285         if self.laterFinalTimeout and self.laterFinalTimeout.active():
286             self.laterFinalTimeout.cancel()
287         self.laterFinalTimeout = None
288         if self.laterNextTimeout and self.laterNextTimeout.active():
289             self.laterNextTimeout.cancel()
290         self.laterNextTimeout = None
291
292 class KRPC:
293     """The KRPC protocol implementation.
294     
295     @ivar transport: the transport to use for the protocol
296     @type factory: L{khashmir.Khashmir}
297     @ivar factory: the main Khashmir program
298     @type stats: L{stats.StatsLogger}
299     @ivar stats: the statistics logger to save transport info
300     @type addr: (C{string}, C{int})
301     @ivar addr: the IP address and port of the source node
302     @type config: C{dictionary}
303     @ivar config: the configuration parameters for the DHT
304     @type tids: C{dictionary}
305     @ivar tids: the transaction IDs outstanding for requests, keys are the
306         transaction ID of the request, values are the deferreds to call with
307         the results
308     @type stopped: C{boolean}
309     @ivar stopped: whether the protocol has been stopped
310     """
311     
312     def __init__(self, addr, server, stats, transport, config = {}):
313         """Initialize the protocol.
314         
315         @type addr: (C{string}, C{int})
316         @param addr: the IP address and port of the source node
317         @type server: L{khashmir.Khashmir}
318         @param server: the main Khashmir program
319         @type stats: L{stats.StatsLogger}
320         @param stats: the statistics logger to save transport info
321         @param transport: the transport to use for the protocol
322         @type config: C{dictionary}
323         @param config: the configuration parameters for the DHT
324             (optional, defaults to using defaults)
325         """
326         self.transport = transport
327         self.factory = server
328         self.stats = stats
329         self.addr = addr
330         self.config = config
331         self.tids = {}
332         self.stopped = False
333
334     def datagramReceived(self, data, addr):
335         """Process the new datagram.
336         
337         @type data: C{string}
338         @param data: the data received from the transport.
339         @type addr: (C{string}, C{int})
340         @param addr: source IP address and port of datagram.
341         """
342         self.stats.receivedBytes(len(data))
343         if self.stopped:
344             if self.config.get('SPEW', False):
345                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
346
347         # Bdecode the message
348         try:
349             msg = bdecode(data)
350         except Exception, e:
351             if self.config.get('SPEW', False):
352                 log.msg("krpc bdecode error: ")
353                 log.err(e)
354             return
355
356         # Make sure the remote node isn't trying anything funny
357         try:
358             verifyMessage(msg)
359         except Exception, e:
360             log.msg("krpc message verification error: ")
361             log.err(e)
362             return
363
364         if self.config.get('SPEW', False):
365             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
366
367         # Process it based on its type
368         if msg[TYP]  == REQ:
369             ilen = len(data)
370             
371             # Requests are handled by the factory
372             f = getattr(self.factory ,"krpc_" + msg[REQ], None)
373             msg[ARG]['_krpc_sender'] =  self.addr
374             if f and callable(f):
375                 self.stats.receivedAction(msg[REQ])
376                 try:
377                     ret = f(*(), **msg[ARG])
378                 except KrpcError, e:
379                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
380                     if e[0] != KRPC_ERROR_INVALID_TOKEN:
381                         log.err(e)
382                     self.stats.errorAction(msg[REQ])
383                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
384                                               [e[0], e[1]])
385                 except TypeError, e:
386                     log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
387                     log.err(e)
388                     self.stats.errorAction(msg[REQ])
389                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
390                                               [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
391                 except Exception, e:
392                     log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
393                     log.err(e)
394                     self.stats.errorAction(msg[REQ])
395                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
396                                               [KRPC_ERROR_SERVER_ERROR, str(e)])
397                 else:
398                     olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
399             else:
400                 # Request for unknown method
401                 log.msg("ERROR: don't know about method %s" % msg[REQ])
402                 self.stats.receivedAction('unknown')
403                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
404                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
405
406             log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen))
407         elif msg[TYP] == RSP:
408             # Responses get processed by their TID's deferred
409             if self.tids.has_key(msg[TID]):
410                 req = self.tids[msg[TID]]
411                 #       callback
412                 del(self.tids[msg[TID]])
413                 msg[RSP]['_krpc_sender'] = addr
414                 req.callback(msg[RSP])
415             else:
416                 # no tid, this transaction was finished already...
417                 if self.config.get('SPEW', False):
418                     log.msg('received response from %r for completed request: %r' %
419                             (msg[RSP]['id'], msg[TID]))
420         elif msg[TYP] == ERR:
421             # Errors get processed by their TID's deferred's errback
422             if self.tids.has_key(msg[TID]):
423                 req = self.tids[msg[TID]]
424                 del(self.tids[msg[TID]])
425                 # callback
426                 req.errback(KrpcError(*msg[ERR]))
427             else:
428                 # no tid, this transaction was finished already...
429                 log.msg('received an error %r from %r for completed request: %r' %
430                         (msg[ERR], msg[RSP]['id'], msg[TID]))
431         else:
432             # Received an unknown message type
433             if self.config.get('SPEW', False):
434                 log.msg("unknown message type: %r" % msg)
435             if msg[TID] in self.tids:
436                 req = self.tids[msg[TID]]
437                 del(self.tids[msg[TID]])
438                 # callback
439                 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
440                                       "Received an unknown message type: %r" % msg[TYP]))
441                 
442     def _sendResponse(self, request, addr, tid, msgType, response):
443         """Helper function for sending responses to nodes.
444
445         @param request: the name of the requested method
446         @type addr: (C{string}, C{int})
447         @param addr: source IP address and port of datagram.
448         @param tid: the transaction ID of the request
449         @param msgType: the type of message to respond with
450         @param response: the arguments for the response
451         """
452         if not response:
453             response = {}
454         
455         try:
456             # Create the response message
457             msg = {TID : tid, TYP : msgType, msgType : response}
458     
459             if self.config.get('SPEW', False):
460                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
461     
462             out = bencode(msg)
463             
464             # Make sure its not too long
465             if len(out) > UDP_PACKET_LIMIT:
466                 # Can we remove some values to shorten it?
467                 if 'values' in response:
468                     # Save the original list of values
469                     orig_values = response['values']
470                     len_orig_values = len(bencode(orig_values))
471                     
472                     # Caclulate the maximum value length possible
473                     max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
474                     assert max_len_values > 0
475                     
476                     # Start with a calculation of how many values should be included
477                     # (assumes all values are the same length)
478                     per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
479                     num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
480     
481                     # Do a linear search for the actual maximum number possible
482                     bencoded_values = len(bencode(orig_values[:num_values]))
483                     while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
484                         bencoded_values += len(bencode(orig_values[num_values]))
485                         num_values += 1
486                     while bencoded_values > max_len_values and num_values > 0:
487                         num_values -= 1
488                         bencoded_values -= len(bencode(orig_values[num_values]))
489                     assert num_values > 0
490     
491                     # Encode the result
492                     response['values'] = orig_values[:num_values]
493                     out = bencode(msg)
494                     assert len(out) < UDP_PACKET_LIMIT
495                     log.msg('Shortened a long packet from %d to %d values, new packet length: %d' % 
496                             (len(orig_values), num_values, len(out)))
497                 else:
498                     # Too long a response, send an error
499                     log.msg('Could not send response, too long: %d bytes' % len(out))
500                     self.stats.errorAction(request)
501                     msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
502                     out = bencode(msg)
503
504         except Exception, e:
505             # Unknown error, send an error message
506             self.stats.errorAction(request)
507             msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
508             out = bencode(msg)
509                     
510         self.stats.sentBytes(len(out))
511         self.transport.write(out, addr)
512         return len(out)
513     
514     def sendRequest(self, method, args):
515         """Send a request to the remote node.
516         
517         @type method: C{string}
518         @param method: the method name to call on the remote node
519         @param args: the arguments to send to the remote node's method
520         """
521         if self.stopped:
522             return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
523                                         "cannot send, connection has been stopped"))
524
525         # Create the request message
526         newTID = newID()
527         msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
528         if self.config.get('SPEW', False):
529             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
530         data = bencode(msg)
531         
532         # Create the request object and save it with the TID
533         req = KrpcRequest(self, newTID, method, data, self.config)
534         self.tids[newTID] = req
535         
536         # Save the conclusion of the action
537         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
538                          callbackArgs = (method, datetime.now()),
539                          errbackArgs = (method, datetime.now()))
540
541         return req
542     
543     def sendData(self, method, data):
544         """Write a request to the transport and save the stats.
545         
546         @type method: C{string}
547         @param method: the name of the method to call on the remote node
548         @type data: C{string}
549         @param data: the message to send to the remote node
550         """
551         self.transport.write(data, self.addr)
552         self.stats.sentAction(method)
553         self.stats.sentBytes(len(data))
554         
555     def timeOut(self, badTID, method):
556         """Call the deferred's errback if a timeout occurs.
557         
558         @param badTID: the transaction ID of the request
559         @type method: C{string}
560         @param method: the name of the method that timed out on the remote node
561         """
562         if badTID in self.tids:
563             req = self.tids[badTID]
564             del(self.tids[badTID])
565             req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % 
566                                                       (method, self.addr)))
567         else:
568             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
569         
570     def stop(self):
571         """Cancel all pending requests."""
572         for req in self.tids.values():
573             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
574                                   'connection has been stopped while waiting for response'))
575         self.tids = {}
576         self.stopped = True
577
578 #{ For testing the KRPC protocol
579 def connectionForAddr(host, port):
580     return host
581     
582 class Receiver(protocol.Factory):
583     protocol = KRPC
584     def __init__(self):
585         self.buf = []
586     def krpc_store(self, msg, _krpc_sender):
587         self.buf += [msg]
588         return {}
589     def krpc_echo(self, msg, _krpc_sender):
590         return {'msg': msg}
591     def krpc_values(self, length, num, _krpc_sender):
592         return {'values': ['1'*length]*num}
593
594 def make(port):
595     from stats import StatsLogger
596     af = Receiver()
597     a = hostbroker(af, StatsLogger(None, None),
598                    {'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
599     a.protocol = KRPC
600     p = reactor.listenUDP(port, a)
601     return af, a, p
602     
603 class KRPCTests(unittest.TestCase):
604     timeout = 2
605     
606     def setUp(self):
607         self.af, self.a, self.ap = make(1180)
608         self.bf, self.b, self.bp = make(1181)
609
610     def tearDown(self):
611         self.ap.stopListening()
612         self.bp.stopListening()
613
614     def bufEquals(self, result, value):
615         self.failUnlessEqual(self.bf.buf, value)
616
617     def testSimpleMessage(self):
618         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
619         d.addCallback(self.bufEquals, ["This is a test."])
620         return d
621
622     def testMessageBlast(self):
623         for i in range(100):
624             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
625         d.addCallback(self.bufEquals, ["This is a test."] * 100)
626         return d
627
628     def testEcho(self):
629         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
630         df.addCallback(self.gotMsg, "This is a test.")
631         return df
632
633     def gotMsg(self, dict, should_be):
634         _krpc_sender = dict['_krpc_sender']
635         self.failUnlessEqual(dict['msg'], should_be)
636
637     def testManyEcho(self):
638         for i in xrange(100):
639             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
640             df.addCallback(self.gotMsg, "This is a test.")
641         return df
642
643     def testMultiEcho(self):
644         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
645         df.addCallback(self.gotMsg, "This is a test.")
646
647         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
648         df.addCallback(self.gotMsg, "This is another test.")
649
650         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
651         df.addCallback(self.gotMsg, "This is yet another test.")
652         
653         return df
654
655     def testEchoReset(self):
656         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
657         df.addCallback(self.gotMsg, "This is a test.")
658
659         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
660         df.addCallback(self.gotMsg, "This is another test.")
661         df.addCallback(self.echoReset)
662         return df
663     
664     def echoReset(self, dict):
665         del(self.a.connections[('127.0.0.1', 1181)])
666         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
667         df.addCallback(self.gotMsg, "This is yet another test.")
668         return df
669
670     def testUnknownMeth(self):
671         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
672         df = self.failUnlessFailure(df, KrpcError)
673         df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
674         return df
675
676     def testMalformedRequest(self):
677         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
678         df = self.failUnlessFailure(df, KrpcError)
679         df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
680         return df
681
682     def gotErr(self, value, should_be, *errorTypes):
683         self.failUnlessEqual(value[0], should_be)
684         if errorTypes:
685             self.flushLoggedErrors(*errorTypes)
686         
687     def testLongPackets(self):
688         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
689         df.addCallback(self.gotLongRsp)
690         return df
691
692     def gotLongRsp(self, dict):
693         # Not quite accurate, but good enough
694         self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
695