47e7452f907e42baa755226e0b94db0abf7bd274
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / krpc.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 """The KRPC communication protocol implementation.
5
6 @var KRPC_TIMEOUT: the number of seconds after which requests timeout
7 @var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
8     UDP packet without fragmentation
9
10 @var KRPC_ERROR: the code for a generic error
11 @var KRPC_ERROR_SERVER_ERROR: the code for a server error
12 @var KRPC_ERROR_MALFORMED_PACKET: the code for a malformed packet error
13 @var KRPC_ERROR_METHOD_UNKNOWN: the code for a method unknown error
14 @var KRPC_ERROR_MALFORMED_REQUEST: the code for a malformed request error
15 @var KRPC_ERROR_INVALID_TOKEN: the code for an invalid token error
16 @var KRPC_ERROR_RESPONSE_TOO_LONG: the code for a response too long error
17
18 @var KRPC_ERROR_INTERNAL: the code for an internal error
19 @var KRPC_ERROR_RECEIVED_UNKNOWN: the code for an unknown message type error
20 @var KRPC_ERROR_TIMEOUT: the code for a timeout error
21 @var KRPC_ERROR_PROTOCOL_STOPPED: the code for a stopped protocol error
22
23 @var TID: the identifier for the transaction ID
24 @var REQ: the identifier for a request packet
25 @var RSP: the identifier for a response packet
26 @var TYP: the identifier for the type of packet
27 @var ARG: the identifier for the argument to the request
28 @var ERR: the identifier for an error packet
29
30 @group Remote node error codes: KRPC_ERROR, KRPC_ERROR_SERVER_ERROR,
31     KRPC_ERROR_MALFORMED_PACKET, KRPC_ERROR_METHOD_UNKNOWN,
32     KRPC_ERROR_MALFORMED_REQUEST, KRPC_ERROR_INVALID_TOKEN,
33     KRPC_ERROR_RESPONSE_TOO_LONG
34 @group Local node error codes: KRPC_ERROR_INTERNAL, KRPC_ERROR_RECEIVED_UNKNOWN,
35     KRPC_ERROR_TIMEOUT, KRPC_ERROR_PROTOCOL_STOPPED
36 @group Command identifiers: TID, REQ, RSP, TYP, ARG, ERR
37
38 """
39
40 from bencode import bencode, bdecode
41 from time import asctime
42 from math import ceil
43
44 from twisted.internet.defer import Deferred
45 from twisted.internet import protocol, reactor
46 from twisted.python import log
47 from twisted.trial import unittest
48
49 from khash import newID
50
51 KRPC_TIMEOUT = 20
52 UDP_PACKET_LIMIT = 1472
53
54 # Remote node errors
55 KRPC_ERROR = 200
56 KRPC_ERROR_SERVER_ERROR = 201
57 KRPC_ERROR_MALFORMED_PACKET = 202
58 KRPC_ERROR_METHOD_UNKNOWN = 203
59 KRPC_ERROR_MALFORMED_REQUEST = 204
60 KRPC_ERROR_INVALID_TOKEN = 205
61 KRPC_ERROR_RESPONSE_TOO_LONG = 206
62
63 # Local errors
64 KRPC_ERROR_INTERNAL = 100
65 KRPC_ERROR_RECEIVED_UNKNOWN = 101
66 KRPC_ERROR_TIMEOUT = 102
67 KRPC_ERROR_PROTOCOL_STOPPED = 103
68
69 # commands
70 TID = 't'
71 REQ = 'q'
72 RSP = 'r'
73 TYP = 'y'
74 ARG = 'a'
75 ERR = 'e'
76
77 class KrpcError(Exception):
78     """An error occurred in the KRPC protocol."""
79     pass
80
81 def verifyMessage(msg):
82     """Check received message for corruption and errors.
83     
84     @type msg: C{dictionary}
85     @param msg: the dictionary of information received on the connection
86     @raise KrpcError: if the message is corrupt
87     """
88     
89     if type(msg) != dict:
90         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "not a dictionary")
91     if TYP not in msg:
92         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no message type")
93     if msg[TYP] == REQ:
94         if REQ not in msg:
95             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type not specified")
96         if type(msg[REQ]) != str:
97             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "request type is not a string")
98         if ARG not in msg:
99             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no arguments for request")
100         if type(msg[ARG]) != dict:
101             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "arguments for request are not in a dictionary")
102     elif msg[TYP] == RSP:
103         if RSP not in msg:
104             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response not specified")
105         if type(msg[RSP]) != dict:
106             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "response is not a dictionary")
107     elif msg[TYP] == ERR:
108         if ERR not in msg:
109             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error not specified")
110         if type(msg[ERR]) != list:
111             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a list")
112         if len(msg[ERR]) != 2:
113             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error is not a 2-element list")
114         if type(msg[ERR][0]) not in (int, long):
115             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error number is not a number")
116         if type(msg[ERR][1]) != str:
117             raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "error string is not a string")
118 #    else:
119 #        raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "unknown message type")
120     if TID not in msg:
121         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "no transaction ID specified")
122     if type(msg[TID]) != str:
123         raise KrpcError, (KRPC_ERROR_MALFORMED_PACKET, "transaction id is not a string")
124
125 class hostbroker(protocol.DatagramProtocol):
126     """The factory for the KRPC protocol.
127     
128     @type server: L{khashmir.Khashmir}
129     @ivar server: the main Khashmir program
130     @type stats: L{stats.StatsLogger}
131     @ivar stats: the statistics logger to save transport info
132     @type config: C{dictionary}
133     @ivar config: the configuration parameters for the DHT
134     @type connections: C{dictionary}
135     @ivar connections: all the connections that have ever been made to the
136         protocol, keys are IP address and port pairs, values are L{KRPC}
137         protocols for the addresses
138     @ivar protocol: the protocol to use to handle incoming connections
139         (added externally)
140     @type addr: (C{string}, C{int})
141     @ivar addr: the IP address and port of this node
142     """
143     
144     def __init__(self, server, stats, config):
145         """Initialize the factory.
146         
147         @type server: L{khashmir.Khashmir}
148         @param server: the main DHT program
149         @type stats: L{stats.StatsLogger}
150         @param stats: the statistics logger to save transport info
151         @type config: C{dictionary}
152         @param config: the configuration parameters for the DHT
153         """
154         self.server = server
155         self.stats = stats
156         self.config = config
157         # this should be changed to storage that drops old entries
158         self.connections = {}
159         
160     def datagramReceived(self, datagram, addr):
161         """Optionally create a new protocol object, and handle the new datagram.
162         
163         @type datagram: C{string}
164         @param datagram: the data received from the transport.
165         @type addr: (C{string}, C{int})
166         @param addr: source IP address and port of datagram.
167         """
168         c = self.connectionForAddr(addr)
169         c.datagramReceived(datagram, addr)
170         #if c.idle():
171         #    del self.connections[addr]
172
173     def connectionForAddr(self, addr):
174         """Get a protocol object for the source.
175         
176         @type addr: (C{string}, C{int})
177         @param addr: source IP address and port of datagram.
178         """
179         # Don't connect to ourself
180         if addr == self.addr:
181             raise KrcpError
182         
183         # Create a new protocol object if necessary
184         if not self.connections.has_key(addr):
185             conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW'])
186             self.connections[addr] = conn
187         else:
188             conn = self.connections[addr]
189         return conn
190
191     def makeConnection(self, transport):
192         """Make a connection to a transport and save our address."""
193         protocol.DatagramProtocol.makeConnection(self, transport)
194         tup = transport.getHost()
195         self.addr = (tup.host, tup.port)
196         
197     def stopProtocol(self):
198         """Stop all the open connections."""
199         for conn in self.connections.values():
200             conn.stop()
201         protocol.DatagramProtocol.stopProtocol(self)
202
203 class KRPC:
204     """The KRPC protocol implementation.
205     
206     @ivar transport: the transport to use for the protocol
207     @type factory: L{khashmir.Khashmir}
208     @ivar factory: the main Khashmir program
209     @type stats: L{stats.StatsLogger}
210     @ivar stats: the statistics logger to save transport info
211     @type addr: (C{string}, C{int})
212     @ivar addr: the IP address and port of the source node
213     @type noisy: C{boolean}
214     @ivar noisy: whether to log additional details of the protocol
215     @type tids: C{dictionary}
216     @ivar tids: the transaction IDs outstanding for requests, keys are the
217         transaction ID of the request, values are the deferreds to call with
218         the results
219     @type stopped: C{boolean}
220     @ivar stopped: whether the protocol has been stopped
221     """
222     
223     def __init__(self, addr, server, stats, transport, spew = False):
224         """Initialize the protocol.
225         
226         @type addr: (C{string}, C{int})
227         @param addr: the IP address and port of the source node
228         @type server: L{khashmir.Khashmir}
229         @param server: the main Khashmir program
230         @type stats: L{stats.StatsLogger}
231         @param stats: the statistics logger to save transport info
232         @param transport: the transport to use for the protocol
233         @type spew: C{boolean}
234         @param spew: whether to log additional details of the protocol
235             (optional, defaults to False)
236         """
237         self.transport = transport
238         self.factory = server
239         self.stats = stats
240         self.addr = addr
241         self.noisy = spew
242         self.tids = {}
243         self.stopped = False
244
245     def datagramReceived(self, data, addr):
246         """Process the new datagram.
247         
248         @type data: C{string}
249         @param data: the data received from the transport.
250         @type addr: (C{string}, C{int})
251         @param addr: source IP address and port of datagram.
252         """
253         self.stats.receivedBytes(len(data))
254         if self.stopped:
255             if self.noisy:
256                 log.msg("stopped, dropping message from %r: %s" % (addr, data))
257
258         # Bdecode the message
259         try:
260             msg = bdecode(data)
261         except Exception, e:
262             if self.noisy:
263                 log.msg("krpc bdecode error: ")
264                 log.err(e)
265             return
266
267         # Make sure the remote node isn't trying anything funny
268         try:
269             verifyMessage(msg)
270         except Exception, e:
271             log.msg("krpc message verification error: ")
272             log.err(e)
273             return
274
275         if self.noisy:
276             log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
277
278         # Process it based on its type
279         if msg[TYP]  == REQ:
280             ilen = len(data)
281             
282             # Requests are handled by the factory
283             f = getattr(self.factory ,"krpc_" + msg[REQ], None)
284             msg[ARG]['_krpc_sender'] =  self.addr
285             if f and callable(f):
286                 self.stats.receivedAction(msg[REQ])
287                 try:
288                     ret = f(*(), **msg[ARG])
289                 except KrpcError, e:
290                     log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
291                     log.err(e)
292                     self.stats.errorAction(msg[REQ])
293                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
294                                               [e[0], e[1]])
295                 except TypeError, e:
296                     log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
297                     log.err(e)
298                     self.stats.errorAction(msg[REQ])
299                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
300                                               [KRPC_ERROR_MALFORMED_REQUEST, str(e)])
301                 except Exception, e:
302                     log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
303                     log.err(e)
304                     self.stats.errorAction(msg[REQ])
305                     olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
306                                               [KRPC_ERROR_SERVER_ERROR, str(e)])
307                 else:
308                     olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
309             else:
310                 # Request for unknown method
311                 log.msg("ERROR: don't know about method %s" % msg[REQ])
312                 self.stats.receivedAction('unknown')
313                 olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
314                                           [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
315             if self.noisy:
316                 log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
317                                                   ilen, msg[REQ], olen))
318         elif msg[TYP] == RSP:
319             # Responses get processed by their TID's deferred
320             if self.tids.has_key(msg[TID]):
321                 df = self.tids[msg[TID]]
322                 #       callback
323                 del(self.tids[msg[TID]])
324                 msg[RSP]['_krpc_sender'] = addr
325                 df.callback(msg[RSP])
326             else:
327                 # no tid, this transaction timed out already...
328                 if self.noisy:
329                     log.msg('timeout: %r' % msg[RSP]['id'])
330         elif msg[TYP] == ERR:
331             # Errors get processed by their TID's deferred's errback
332             if self.tids.has_key(msg[TID]):
333                 df = self.tids[msg[TID]]
334                 del(self.tids[msg[TID]])
335                 # callback
336                 df.errback(KrpcError(*msg[ERR]))
337             else:
338                 # day late and dollar short, just log it
339                 log.msg("Got an error for an unknown request: %r" % (msg[ERR], ))
340                 pass
341         else:
342             # Received an unknown message type
343             if self.noisy:
344                 log.msg("unknown message type: %r" % msg)
345             if msg[TID] in self.tids:
346                 df = self.tids[msg[TID]]
347                 del(self.tids[msg[TID]])
348                 # callback
349                 df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
350                                      "Received an unknown message type: %r" % msg[TYP]))
351                 
352     def _sendResponse(self, request, addr, tid, msgType, response):
353         """Helper function for sending responses to nodes.
354
355         @param request: the name of the requested method
356         @type addr: (C{string}, C{int})
357         @param addr: source IP address and port of datagram.
358         @param tid: the transaction ID of the request
359         @param msgType: the type of message to respond with
360         @param response: the arguments for the response
361         """
362         if not response:
363             response = {}
364         
365         try:
366             # Create the response message
367             msg = {TID : tid, TYP : msgType, msgType : response}
368     
369             if self.noisy:
370                 log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
371     
372             out = bencode(msg)
373             
374             # Make sure its not too long
375             if len(out) > UDP_PACKET_LIMIT:
376                 # Can we remove some values to shorten it?
377                 if 'values' in response:
378                     # Save the original list of values
379                     orig_values = response['values']
380                     len_orig_values = len(bencode(orig_values))
381                     
382                     # Caclulate the maximum value length possible
383                     max_len_values = len_orig_values - (len(out) - UDP_PACKET_LIMIT)
384                     assert max_len_values > 0
385                     
386                     # Start with a calculation of how many values should be included
387                     # (assumes all values are the same length)
388                     per_value = (float(len_orig_values) - 2.0) / float(len(orig_values))
389                     num_values = len(orig_values) - int(ceil(float(len(out) - UDP_PACKET_LIMIT) / per_value))
390     
391                     # Do a linear search for the actual maximum number possible
392                     bencoded_values = len(bencode(orig_values[:num_values]))
393                     while bencoded_values < max_len_values and num_values + 1 < len(orig_values):
394                         bencoded_values += len(bencode(orig_values[num_values]))
395                         num_values += 1
396                     while bencoded_values > max_len_values and num_values > 0:
397                         num_values -= 1
398                         bencoded_values -= len(bencode(orig_values[num_values]))
399                     assert num_values > 0
400     
401                     # Encode the result
402                     response['values'] = orig_values[:num_values]
403                     out = bencode(msg)
404                     assert len(out) < UDP_PACKET_LIMIT
405                     log.msg('Shortened a long packet from %d to %d values, new packet length: %d' % 
406                             (len(orig_values), num_values, len(out)))
407                 else:
408                     # Too long a response, send an error
409                     log.msg('Could not send response, too long: %d bytes' % len(out))
410                     self.stats.errorAction(request)
411                     msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
412                     out = bencode(msg)
413
414         except Exception, e:
415             # Unknown error, send an error message
416             self.stats.errorAction(request)
417             msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
418             out = bencode(msg)
419                     
420         self.stats.sentBytes(len(out))
421         self.transport.write(out, addr)
422         return len(out)
423     
424     def sendRequest(self, method, args):
425         """Send a request to the remote node.
426         
427         @type method: C{string}
428         @param method: the methiod name to call on the remote node
429         @param args: the arguments to send to the remote node's method
430         """
431         if self.stopped:
432             raise KrpcError, (KRPC_ERROR_PROTOCOL_STOPPED, "cannot send, connection has been stopped")
433
434         # Create the request message
435         msg = {TID : newID(), TYP : REQ,  REQ : method, ARG : args}
436         if self.noisy:
437             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
438         data = bencode(msg)
439         
440         # Create the deferred and save it with the TID
441         d = Deferred()
442         self.tids[msg[TID]] = d
443         
444         # Save the conclusion of the action
445         d.addCallbacks(self.stats.responseAction, self.stats.failedAction,
446                        callbackArgs = (method, ), errbackArgs = (method, ))
447
448         # Schedule a later timeout call
449         def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr):
450             """Call the deferred's errback if a timeout occurs."""
451             if tids.has_key(id):
452                 df = tids[id]
453                 del(tids[id])
454                 df.errback(KrpcError(KRPC_ERROR_TIMEOUT, "timeout waiting for '%s' from %r" % (method, addr)))
455         later = reactor.callLater(KRPC_TIMEOUT, timeOut)
456         
457         # Cancel the timeout call if a response is received
458         def dropTimeOut(dict, later_call = later):
459             """Cancel the timeout call when a response is received."""
460             if later_call.active():
461                 later_call.cancel()
462             return dict
463         d.addBoth(dropTimeOut)
464
465         # Save some stats
466         self.stats.sentAction(method)
467         self.stats.sentBytes(len(data))
468         
469         self.transport.write(data, self.addr)
470         return d
471     
472     def stop(self):
473         """Timeout all pending requests."""
474         for df in self.tids.values():
475             df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
476                                  'connection has been stopped while waiting for response'))
477         self.tids = {}
478         self.stopped = True
479
480 #{ For testing the KRPC protocol
481 def connectionForAddr(host, port):
482     return host
483     
484 class Receiver(protocol.Factory):
485     protocol = KRPC
486     def __init__(self):
487         self.buf = []
488     def krpc_store(self, msg, _krpc_sender):
489         self.buf += [msg]
490         return {}
491     def krpc_echo(self, msg, _krpc_sender):
492         return {'msg': msg}
493     def krpc_values(self, length, num, _krpc_sender):
494         return {'values': ['1'*length]*num}
495
496 def make(port):
497     from stats import StatsLogger
498     af = Receiver()
499     a = hostbroker(af, StatsLogger(None, None, {}), {'SPEW': False})
500     a.protocol = KRPC
501     p = reactor.listenUDP(port, a)
502     return af, a, p
503     
504 class KRPCTests(unittest.TestCase):
505     timeout = 2
506     
507     def setUp(self):
508         self.af, self.a, self.ap = make(1180)
509         self.bf, self.b, self.bp = make(1181)
510
511     def tearDown(self):
512         self.ap.stopListening()
513         self.bp.stopListening()
514
515     def bufEquals(self, result, value):
516         self.failUnlessEqual(self.bf.buf, value)
517
518     def testSimpleMessage(self):
519         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
520         d.addCallback(self.bufEquals, ["This is a test."])
521         return d
522
523     def testMessageBlast(self):
524         for i in range(100):
525             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
526         d.addCallback(self.bufEquals, ["This is a test."] * 100)
527         return d
528
529     def testEcho(self):
530         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
531         df.addCallback(self.gotMsg, "This is a test.")
532         return df
533
534     def gotMsg(self, dict, should_be):
535         _krpc_sender = dict['_krpc_sender']
536         self.failUnlessEqual(dict['msg'], should_be)
537
538     def testManyEcho(self):
539         for i in xrange(100):
540             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
541             df.addCallback(self.gotMsg, "This is a test.")
542         return df
543
544     def testMultiEcho(self):
545         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
546         df.addCallback(self.gotMsg, "This is a test.")
547
548         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
549         df.addCallback(self.gotMsg, "This is another test.")
550
551         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
552         df.addCallback(self.gotMsg, "This is yet another test.")
553         
554         return df
555
556     def testEchoReset(self):
557         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
558         df.addCallback(self.gotMsg, "This is a test.")
559
560         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
561         df.addCallback(self.gotMsg, "This is another test.")
562         df.addCallback(self.echoReset)
563         return df
564     
565     def echoReset(self, dict):
566         del(self.a.connections[('127.0.0.1', 1181)])
567         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
568         df.addCallback(self.gotMsg, "This is yet another test.")
569         return df
570
571     def testUnknownMeth(self):
572         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
573         df = self.failUnlessFailure(df, KrpcError)
574         df.addBoth(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
575         return df
576
577     def testMalformedRequest(self):
578         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test.", 'foo': 'bar'})
579         df = self.failUnlessFailure(df, KrpcError)
580         df.addBoth(self.gotErr, KRPC_ERROR_MALFORMED_REQUEST, TypeError)
581         return df
582
583     def gotErr(self, value, should_be, *errorTypes):
584         self.failUnlessEqual(value[0], should_be)
585         if errorTypes:
586             self.flushLoggedErrors(*errorTypes)
587         
588     def testLongPackets(self):
589         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('values', {'length' : 1, 'num': 2000})
590         df.addCallback(self.gotLongRsp)
591         return df
592
593     def gotLongRsp(self, dict):
594         # Not quite accurate, but good enough
595         self.failUnless(len(bencode(dict))-10 < UDP_PACKET_LIMIT)
596