Added .gitattributes
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
1
2 """The KRPC communication protocol implementation.
3
4 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
5     UDP packet without fragmentation
6
7 @var KRPC_ERROR: the code for a generic error
8 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
9 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
10 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
11 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
12 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
13 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
14
15 @var KRPC_ERROR_INTERNAL: the code for an internal error
16 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
17 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
18 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
19
20 @var TID: the identifier for the transaction ID
21 @var REQ: the identifier for a request packet
22 @var RSP: the identifier for a response packet
23 @var TYP: the identifier for the type of packet
24 @var ARG: the identifier for the argument to the request
25 @var ERR: the identifier for an error packet
26
27 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
28     KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
29     KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
30     KRPC_ERROR_RESPONSE_TOO_LONG
31 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
32     KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
33 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
34
35 """
36
37 from bencode import bencode, bdecode
38 from datetime import datetime, timedelta
39 from math import ceil
40
41 from twisted.internet import defer
42 from twisted.internet import protocol, reactor
43 from twisted.python import log
44 from twisted.trial import unittest
45
46 from khash import newID
47
48 UDP_PACKET_LIMIT = 1472
49
50 # Remote node errors
51 KRPC_ERROR = 200
52 KRPC_ERROR_SERVER_ERROR = 201
53 KRPC_ERROR_MALFORMED_PACKET = 202
54 KRPC_ERROR_METHOD_UNKNOWN = 203
55 KRPC_ERROR_MALFORMED_REQUEST = 204
56 KRPC_ERROR_INVALID_TOKEN = 205
57 KRPC_ERROR_RESPONSE_TOO_LONG = 206
58
59 # Local errors
60 KRPC_ERROR_INTERNAL = 100
61 KRPC_ERROR_RECEIVED_UNKNOWN = 101
62 KRPC_ERROR_TIMEOUT = 102
63 KRPC_ERROR_PROTOCOL_STOPPED = 103
64
65 # commands
66 TID = 't'
67 REQ = 'q'
68 RSP = 'r'
69 TYP = 'y'
70 ARG = 'a'
71 ERR = 'e'
72
73 class KrpcError(Exception):
74     """An error occurred in the KRPC protocol."""
75     pass
76
77 def verifyMessage(msg):
78     """Check received message for corruption and errors.
79     
80     @type msg: C{dictionary}
81     @param msg: the dictionary of information received on the connection
82     @raise KrpcError: if the message is corrupt
83     """
84     
85     if type(msg) != dict:
86         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
87     if TYP not in msg:
88         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
89     if msg[TYP] == REQ:
90         if REQ not in msg:
91             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
92         if type(msg[REQ]) != str:
93             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
94         if ARG not in msg:
95             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
96         if type(msg[ARG]) != dict:
97             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
98     elif msg[TYP] == RSP:
99         if RSP not in msg:
100             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
101         if type(msg[RSP]) != dict:
102             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
103 #        if 'nodes' in msg[RSP] and type(msg[RSP]['nodes']) != list:
104 #            raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "wrong type of node, this is not bittorrent")
105     elif msg[TYP] == ERR:
106         if ERR not in msg:
107             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
108         if type(msg[ERR]) != list:
109             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
110         if len(msg[ERR]) != 2:
111             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
112         if type(msg[ERR][0]) not in (int, long):
113             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
114         if type(msg[ERR][1]) != str:
115             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
116 #    else:
117 #        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
118     if TID not in msg:
119         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
120     if type(msg[TID]) != str:
121         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
122     if len(msg[TID]) != 20:
123         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "wrong type of node, this is not bittorrent")
124
125 class hostbroker(protocol.DatagramProtocol):
126     """The factory for the KRPC protocol.
127     
128     @type server: L{khashmir.Khashmir}
129     @ivar server: the main Khashmir program
130     @type stats: L{stats.StatsLogger}
131     @ivar stats: the statistics logger to save transport info
132     @type config: C{dictionary}
133     @ivar config: the configuration parameters for the DHT
134     @type connections: C{dictionary}
135     @ivar connections: all the connections that have ever been made to the
136         protocol, keys are IP address and port pairs, values are L{KRPC}
137         protocols for the addresses
138     @ivar protocol: the protocol to use to handle incoming connections
139         (added externally)
140     @type addr: (C{string}, C{int})
141     @ivar addr: the IP address and port of this node
142     """
143     
144     def __init__(self, server, stats, config):
145         """Initialize the factory.
146         
147         @type server: L{khashmir.Khashmir}
148         @param server: the main DHT program
149         @type stats: L{stats.StatsLogger}
150         @param stats: the statistics logger to save transport info
151         @type config: C{dictionary}
152         @param config: the configuration parameters for the DHT
153         """
154         self.server = server
155         self.stats = stats
156         self.config = config
157         # this should be changed to storage that drops old entries
158         self.connections = {}
159         
160     def datagramReceived(self, datagram, addr):
161         """Optionally create a new protocol object, and handle the new datagram.
162         
163         @type datagram: C{string}
164         @param datagram: the data received from the transport.
165         @type addr: (C{string}, C{int})
166         @param addr: source IP address and port of datagram.
167         """
168         c = self.connectionForAddr(addr)
169         c.datagramReceived(datagram, addr)
170         #if c.idle():
171         #    del self.connections[addr]
172
173     def connectionForAddr(self, addr):
174         """Get a protocol object for the source.
175         
176         @type addr: (C{string}, C{int})
177         @param addr: source IP address and port of datagram.
178         """
179         # Don't connect to ourself
180         if addr == self.addr:
181             raise KrcpError
182         
183         # Create a new protocol object if necessary
184         if not self.connections.has_key(addr):
185             conn = self.protocol(addr, self.server, self.stats, self.transport, self.config)
186             self.connections[addr] = conn
187         else:
188             conn = self.connections[addr]
189         return conn
190
191     def makeConnection(self, transport):
192         """Make a connection to a transport and save our address."""
193         protocol.DatagramProtocol.makeConnection(self, transport)
194         tup = transport.getHost()
195         self.addr = (tup.host, tup.port)
196         
197     def stopProtocol(self):
198         """Stop all the open connections."""
199         for conn in self.connections.values():
200             conn.stop()
201         protocol.DatagramProtocol.stopProtocol(self)
202
203 class KrpcRequest(defer.Deferred):
204     """An outstanding request to a remote node.
205     
206     @type protocol: L{KRPC}
207     @ivar protocol: the protocol to send data with
208     @ivar tid: the transaction ID of the request
209     @type method: C{string}
210     @ivar method: the name of the method to call on the remote node
211     @type data: C{string}
212     @ivar data: the message to send to the remote node
213     @type config: C{dictionary}
214     @ivar config: the configuration parameters for the DHT
215     @type delay: C{int}
216     @ivar delay: the last timeout delay sent
217     @type start: C{datetime}
218     @ivar start: the time to request was started at
219     @type laterNextTimeout: L{twisted.internet.interfaces.IDelayedCall}
220     @ivar laterNextTimeout: the pending call to timeout the last sent request
221     @type laterFinalTimeout: L{twisted.internet.interfaces.IDelayedCall}
222     @ivar laterFinalTimeout: the pending call to timeout the entire request
223     """
224     
225     def __init__(self, protocol, newTID, method, data, config):
226         """Initialize the request, and send it out.
227         
228         @type protocol: L{KRPC}
229         @param protocol: the protocol to send data with
230         @param newTID: the transaction ID of the request
231         @type method: C{string}
232         @param method: the name of the method to call on the remote node
233         @type data: C{string}
234         @param data: the message to send to the remote node
235         @type config: C{dictionary}
236         @param config: the configuration parameters for the DHT
237         """
238         defer.Deferred.__init__(self)
239         self.protocol = protocol
240         self.tid = newTID
241         self.method = method
242         self.data = data
243         self.config = config
244         self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
245         self.start = datetime.now()
246         self.laterNextTimeout = None
247         self.laterFinalTimeout = reactor.callLater(self.config.get('KRPC_TIMEOUT', 9), self.finalTimeout)
248         reactor.callLater(0, self.send)
249         
250     def send(self):
251         """Send the request to the remote node."""
252         assert not self.laterNextTimeout, 'There is already a pending request'
253         self.laterNextTimeout = reactor.callLater(self.delay, self.nextTimeout)
254         try:
255             self.protocol.sendData(self.method, self.data)
256         except:
257             log.err()
258
259     def nextTimeout(self):
260         """Check for a unrecoverable timeout, otherwise resend."""
261         self.laterNextTimeout = None
262         if datetime.now() - self.start > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 9)):
263             self.finalTimeout()
264         elif self.protocol.stopped:
265             log.msg('Timeout but can not resend %r, protocol has been stopped' % self.tid)
266         else:
267             self.delay *= 2
268             log.msg('Trying to resend %r now with delay %d sec' % (self.tid, self.delay))
269             reactor.callLater(0, self.send)
270         
271     def finalTimeout(self):
272         """Timeout the request after an unrecoverable timeout."""
273         self.dropTimeOut()
274         delay = datetime.now() - self.start
275         log.msg('%r timed out after %0.2f sec' %
276                 (self.tid, delay.seconds + delay.microseconds/1000000.0))
277         self.protocol.timeOut(self.tid, self.method)
278         
279     def callback(self, resp):
280         self.dropTimeOut()
281         defer.Deferred.callback(self, resp)
282         
283     def errback(self, resp):
284         self.dropTimeOut()
285         defer.Deferred.errback(self, resp)
286         
287     def dropTimeOut(self):
288         """Cancel the timeout call when a response is received."""
289         if self.laterFinalTimeout and self.laterFinalTimeout.active():
290             self.laterFinalTimeout.cancel()
291         self.laterFinalTimeout = None
292         if self.laterNextTimeout and self.laterNextTimeout.active():
293             self.laterNextTimeout.cancel()
294         self.laterNextTimeout = None
295
296 class KRPC:
297     """The KRPC protocol implementation.
298     
299     @ivar transport: the transport to use for the protocol
300     @type factory: L{khashmir.Khashmir}
301     @ivar factory: the main Khashmir program
302     @type stats: L{stats.StatsLogger}
303     @ivar stats: the statistics logger to save transport info
304     @type addr: (C{string}, C{int})
305     @ivar addr: the IP address and port of the source node
306     @type config: C{dictionary}
307     @ivar config: the configuration parameters for the DHT
308     @type tids: C{dictionary}
309     @ivar tids: the transaction IDs outstanding for requests, keys are the
310         transaction ID of the request, values are the deferreds to call with
311         the results
312     @type stopped: C{boolean}
313     @ivar stopped: whether the protocol has been stopped
314     """
315     
316     def __init__(self, addr, server, stats, transport, config = {}):
317         """Initialize the protocol.
318         
319         @type addr: (C{string}, C{int})
320         @param addr: the IP address and port of the source node
321         @type server: L{khashmir.Khashmir}
322         @param server: the main Khashmir program
323         @type stats: L{stats.StatsLogger}
324         @param stats: the statistics logger to save transport info
325         @param transport: the transport to use for the protocol
326         @type config: C{dictionary}
327         @param config: the configuration parameters for the DHT
328             (optional, defaults to using defaults)
329         """
330         self.transport = transport
331         self.factory = server
332         self.stats = stats
333         self.addr = addr
334         self.config = config
335         self.tids = {}
336         self.stopped = False
337
338     def datagramReceived(self, data, addr):
339         """Process the new datagram.
340         
341         @type data: C{string}
342         @param data: the data received from the transport.
343         @type addr: (C{string}, C{int})
344         @param addr: source IP address and port of datagram.
345         """
346         self.stats.receivedBytes(len(data))
347         if self.stopped:
348             if self.config.get('SPEW', False):
349                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
350
351         # Bdecode the message
352         try:
353             msg = bdecode(data)
354         except Exception, e:
355             if self.config.get('SPEW', False):
356                 log.msg("krpc bdecode error from %r: " % (addr, ))
357                 log.err(e)
358             return
359
360         # Make sure the remote node isn't trying anything funny
361         try:
362             verifyMessage(msg)
363         except Exception, e:
364             log.msg("krpc message verification error from %r: %r" % (addr, e))
365             return
366
367         if self.config.get('SPEW', False):
368             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
369
370         # Process it based on its type
371         if msg[TYP]  == REQ:
372             ilen = len(data)
373             
374             # Requests are handled by the factory
375             f = getattr(self.factory ,"krpc_" + msg[REQ], None)
376             msg[ARG]['_krpc_sender'] =  self.addr
377             if f and callable(f):
378                 self.stats.receivedAction(msg[REQ])
379                 try:
380                     ret = f(*(), **msg[ARG])
381                 except KrpcError, e:
382                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
383                     if e[0] != KRPC_ERROR_INVALID_TOKEN:
384                         log.err(e)
385                     else:
386                         log.msg('Node sent us an invalid token, not storing')
387                     self.stats.errorAction(msg[REQ])
388                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
389                                               [e[0], e[1]])
390                 except TypeError, e:
391                     log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
392                     log.err(e)
393                     self.stats.errorAction(msg[REQ])
394                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
395                                               [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
396                 except Exception, e:
397                     log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
398                     log.err(e)
399                     self.stats.errorAction(msg[REQ])
400                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
401                                               [KRPC_ERROR_SERVER_ERROR, str(e)])
402                 else:
403                     olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
404             else:
405                 # Request for unknown method
406                 log.msg("ERROR: don't know about method %s" % msg[REQ])
407                 self.stats.receivedAction('unknown')
408                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
409                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
410
411             log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen))
412         elif msg[TYP] == RSP:
413             # Responses get processed by their TID's deferred
414             if self.tids.has_key(msg[TID]):
415                 req = self.tids[msg[TID]]
416                 #       callback
417                 del(self.tids[msg[TID]])
418                 msg[RSP]['_krpc_sender'] = addr
419                 req.callback(msg[RSP])
420             else:
421                 # no tid, this transaction was finished already...
422                 if self.config.get('SPEW', False):
423                     log.msg('received response from %r for completed request: %r' %
424                             (msg[RSP]['id'], msg[TID]))
425         elif msg[TYP] == ERR:
426             # Errors get processed by their TID's deferred's errback
427             if self.tids.has_key(msg[TID]):
428                 req = self.tids[msg[TID]]
429                 del(self.tids[msg[TID]])
430                 # callback
431                 req.errback(KrpcError(*msg[ERR]))
432             else:
433                 # no tid, this transaction was finished already...
434                 log.msg('received an error %r from %r for completed request: %r' %
435                         (msg[ERR], addr, msg[TID]))
436         else:
437             # Received an unknown message type
438             if self.config.get('SPEW', False):
439                 log.msg("unknown message type: %r" % msg)
440             if msg[TID] in self.tids:
441                 req = self.tids[msg[TID]]
442                 del(self.tids[msg[TID]])
443                 # callback
444                 req.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
445                                       "Received an unknown message type: %r" % msg[TYP]))
446                 
447     def _sendResponse(self, request, addr, tid, msgType, response):
448         """Helper function for sending responses to nodes.
449
450         @param request: the name of the requested method
451         @type addr: (C{string}, C{int})
452         @param addr: source IP address and port of datagram.
453         @param tid: the transaction ID of the request
454         @param msgType: the type of message to respond with
455         @param response: the arguments for the response
456         """
457         if not response:
458             response = {}
459         
460         try:
461             # Create the response message
462             msg = {TID : tid, TYP : msgType, msgType : response}
463     
464             if self.config.get('SPEW', False):
465                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
466     
467             out = bencode(msg)
468             
469             # Make sure its not too long
470             if len(out) > UDP_PACKET_LIMIT:
471                 # Can we remove some values to shorten it?
472                 if 'values' in response:
473                     # Save the original list of values
474                     orig_values = response['values']
475                     len_orig_values = len(bencode(orig_values))
476                     
477                     # Caclulate the maximum value length possible
478                     max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
479                     assert max_len_values > 0
480                     
481                     # Start with a calculation of how many values should be included
482                     # (assumes all values are the same length)
483                     per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
484                     num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
485     
486                     # Do a linear search for the actual maximum number possible
487                     bencoded_values = len(bencode(orig_values[:num_values]))
488                     while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
489                         bencoded_values += len(bencode(orig_values[num_values]))
490                         num_values += 1
491                     while bencoded_values > max_len_values and num_values > 0:
492                         num_values -= 1
493                         bencoded_values -= len(bencode(orig_values[num_values]))
494                     assert num_values > 0
495     
496                     # Encode the result
497                     response['values'] = orig_values[:num_values]
498                     out = bencode(msg)
499                     assert len(out) < UDP_PACKET_LIMIT
500                     log.msg('Shortened a long packet from %d to %d values, new packet length: %d' % 
501                             (len(orig_values), num_values, len(out)))
502                 else:
503                     # Too long a response, send an error
504                     log.msg('Could not send response, too long: %d bytes' % len(out))
505                     self.stats.errorAction(request)
506                     msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
507                     out = bencode(msg)
508
509         except Exception, e:
510             # Unknown error, send an error message
511             self.stats.errorAction(request)
512             msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
513             out = bencode(msg)
514                     
515         self.stats.sentBytes(len(out))
516         self.transport.write(out, addr)
517         return len(out)
518     
519     def sendRequest(self, method, args):
520         """Send a request to the remote node.
521         
522         @type method: C{string}
523         @param method: the method name to call on the remote node
524         @param args: the arguments to send to the remote node's method
525         """
526         if self.stopped:
527             return defer.fail(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
528                                         "cannot send, connection has been stopped"))
529
530         # Create the request message
531         newTID = newID()
532         msg = {TID : newTID, TYP : REQ,  REQ : method, ARG : args}
533         if self.config.get('SPEW', False):
534             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
535         data = bencode(msg)
536         
537         # Create the request object and save it with the TID
538         req = KrpcRequest(self, newTID, method, data, self.config)
539         self.tids[newTID] = req
540         
541         # Save the conclusion of the action
542         req.addCallbacks(self.stats.responseAction, self.stats.failedAction,
543                          callbackArgs = (method, datetime.now()),
544                          errbackArgs = (method, datetime.now()))
545
546         return req
547     
548     def sendData(self, method, data):
549         """Write a request to the transport and save the stats.
550         
551         @type method: C{string}
552         @param method: the name of the method to call on the remote node
553         @type data: C{string}
554         @param data: the message to send to the remote node
555         """
556         self.transport.write(data, self.addr)
557         self.stats.sentAction(method)
558         self.stats.sentBytes(len(data))
559         
560     def timeOut(self, badTID, method):
561         """Call the deferred's errback if a timeout occurs.
562         
563         @param badTID: the transaction ID of the request
564         @type method: C{string}
565         @param method: the name of the method that timed out on the remote node
566         """
567         if badTID in self.tids:
568             req = self.tids[badTID]
569             del(self.tids[badTID])
570             req.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % 
571                                                       (method, self.addr)))
572         else:
573             log.msg('Received a timeout for an unknown request for %s from %r' % (method, self.addr))
574         
575     def stop(self):
576         """Cancel all pending requests."""
577         for req in self.tids.values():
578             req.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
579                                   'connection has been stopped while waiting for response'))
580         self.tids = {}
581         self.stopped = True
582
583 #{ For testing the KRPC protocol
584 def connectionForAddr(host, port):
585     return host
586     
587 class Receiver(protocol.Factory):
588     protocol = KRPC
589     def __init__(self):
590         self.buf = []
591     def krpc_store(self, msg, _krpc_sender):
592         self.buf += [msg]
593         return {}
594     def krpc_echo(self, msg, _krpc_sender):
595         return {'msg': msg}
596     def krpc_values(self, length, num, _krpc_sender):
597         return {'values': ['1'*length]*num}
598
599 def make(port):
600     from stats import StatsLogger
601     af = Receiver()
602     a = hostbroker(af, StatsLogger(None, None),
603                    {'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
604     a.protocol = KRPC
605     p = reactor.listenUDP(port, a)
606     return af, a, p
607     
608 class KRPCTests(unittest.TestCase):
609     timeout = 2
610     
611     def setUp(self):
612         self.af, self.a, self.ap = make(1180)
613         self.bf, self.b, self.bp = make(1181)
614
615     def tearDown(self):
616         self.ap.stopListening()
617         self.bp.stopListening()
618
619     def bufEquals(self, result, value):
620         self.failUnlessEqual(self.bf.buf, value)
621
622     def testSimpleMessage(self):
623         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
624         d.addCallback(self.bufEquals, ["This is a test."])
625         return d
626
627     def testMessageBlast(self):
628         for i in range(100):
629             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
630         d.addCallback(self.bufEquals, ["This is a test."] * 100)
631         return d
632
633     def testEcho(self):
634         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
635         df.addCallback(self.gotMsg, "This is a test.")
636         return df
637
638     def gotMsg(self, dict, should_be):
639         _krpc_sender = dict['_krpc_sender']
640         self.failUnlessEqual(dict['msg'], should_be)
641
642     def testManyEcho(self):
643         for i in xrange(100):
644             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
645             df.addCallback(self.gotMsg, "This is a test.")
646         return df
647
648     def testMultiEcho(self):
649         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
650         df.addCallback(self.gotMsg, "This is a test.")
651
652         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
653         df.addCallback(self.gotMsg, "This is another test.")
654
655         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
656         df.addCallback(self.gotMsg, "This is yet another test.")
657         
658         return df
659
660     def testEchoReset(self):
661         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
662         df.addCallback(self.gotMsg, "This is a test.")
663
664         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
665         df.addCallback(self.gotMsg, "This is another test.")
666         df.addCallback(self.echoReset)
667         return df
668     
669     def echoReset(self, dict):
670         del(self.a.connections[('127.0.0.1', 1181)])
671         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
672         df.addCallback(self.gotMsg, "This is yet another test.")
673         return df
674
675     def testUnknownMeth(self):
676         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
677         df = self.failUnlessFailure(df, KrpcError)
678         df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
679         return df
680
681     def testMalformedRequest(self):
682         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
683         df = self.failUnlessFailure(df, KrpcError)
684         df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
685         return df
686
687     def gotErr(self, value, should_be, *errorTypes):
688         self.failUnlessEqual(value[0], should_be)
689         if errorTypes:
690             self.flushLoggedErrors(*errorTypes)
691         
692     def testLongPackets(self):
693         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
694         df.addCallback(self.gotLongRsp)
695         return df
696
697     def gotLongRsp(self, dict):
698         # Not quite accurate, but good enough
699         self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
700