Disable the deprecation warning for the khashmir tests.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / krpc.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from bencode import bencode, bdecode
5 from time import asctime
6 import sys
7 from traceback import format_exception
8
9 from twisted.internet.defer import Deferred
10 from twisted.internet import protocol, reactor
11 from twisted.trial import unittest
12
13 KRPC_TIMEOUT = 20
14
15 KRPC_ERROR = 1
16 KRPC_ERROR_METHOD_UNKNOWN = 2
17 KRPC_ERROR_RECEIVED_UNKNOWN = 3
18 KRPC_ERROR_TIMEOUT = 4
19
20 # commands
21 TID = 't'
22 REQ = 'q'
23 RSP = 'r'
24 TYP = 'y'
25 ARG = 'a'
26 ERR = 'e'
27
28 class hostbroker(protocol.DatagramProtocol):       
29     def __init__(self, server):
30         self.server = server
31         # this should be changed to storage that drops old entries
32         self.connections = {}
33         
34     def datagramReceived(self, datagram, addr):
35         #print `addr`, `datagram`
36         #if addr != self.addr:
37         c = self.connectionForAddr(addr)
38         c.datagramReceived(datagram, addr)
39         #if c.idle():
40         #    del self.connections[addr]
41
42     def connectionForAddr(self, addr):
43         if addr == self.addr:
44             raise Exception
45         if not self.connections.has_key(addr):
46             conn = self.protocol(addr, self.server, self.transport)
47             self.connections[addr] = conn
48         else:
49             conn = self.connections[addr]
50         return conn
51
52     def makeConnection(self, transport):
53         protocol.DatagramProtocol.makeConnection(self, transport)
54         tup = transport.getHost()
55         self.addr = (tup.host, tup.port)
56
57 ## connection
58 class KRPC:
59     noisy = 0
60     def __init__(self, addr, server, transport):
61         self.transport = transport
62         self.factory = server
63         self.addr = addr
64         self.tids = {}
65         self.mtid = 0
66
67     def datagramReceived(self, str, addr):
68         # bdecode
69         try:
70             msg = bdecode(str)
71         except Exception, e:
72             if self.noisy:
73                 print "response decode error: " + `e`
74         else:
75             #if self.noisy:
76             #    print msg
77             # look at msg type
78             if msg[TYP]  == REQ:
79                 ilen = len(str)
80                 # if request
81                 #       tell factory to handle
82                 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
83                 msg[ARG]['_krpc_sender'] =  self.addr
84                 if f and callable(f):
85                     try:
86                         ret = apply(f, (), msg[ARG])
87                     except Exception, e:
88                         ## send error
89                         out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
90                         olen = len(out)
91                         self.transport.write(out, addr)
92                     else:
93                         if ret:
94                             #   make response
95                             out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
96                         else:
97                             out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
98                         #       send response
99                         olen = len(out)
100                         self.transport.write(out, addr)
101
102                 else:
103                     if self.noisy:
104                         print "don't know about method %s" % msg[REQ]
105                     # unknown method
106                     out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
107                     olen = len(out)
108                     self.transport.write(out, addr)
109                 if self.noisy:
110                     print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port, 
111                                                     ilen, msg[REQ], olen)
112             elif msg[TYP] == RSP:
113                 # if response
114                 #       lookup tid
115                 if self.tids.has_key(msg[TID]):
116                     df = self.tids[msg[TID]]
117                     #   callback
118                     del(self.tids[msg[TID]])
119                     df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
120                 else:
121                     print 'timeout ' + `msg[RSP]['id']`
122                     # no tid, this transaction timed out already...
123             elif msg[TYP] == ERR:
124                 # if error
125                 #       lookup tid
126                 if self.tids.has_key(msg[TID]):
127                     df = self.tids[msg[TID]]
128                     #   callback
129                     df.errback(msg[ERR])
130                     del(self.tids[msg[TID]])
131                 else:
132                     # day late and dollar short
133                     pass
134             else:
135                 print "unknown message type " + `msg`
136                 # unknown message type
137                 df = self.tids[msg[TID]]
138                 #       callback
139                 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
140                 del(self.tids[msg[TID]])
141                 
142     def sendRequest(self, method, args):
143         # make message
144         # send it
145         msg = {TID : chr(self.mtid), TYP : REQ,  REQ : method, ARG : args}
146         self.mtid = (self.mtid + 1) % 256
147         str = bencode(msg)
148         d = Deferred()
149         self.tids[msg[TID]] = d
150         def timeOut(tids = self.tids, id = msg[TID]):
151             if tids.has_key(id):
152                 df = tids[id]
153                 del(tids[id])
154                 print ">>>>>> KRPC_ERROR_TIMEOUT"
155                 df.errback(KRPC_ERROR_TIMEOUT)
156         later = reactor.callLater(KRPC_TIMEOUT, timeOut)
157         def dropTimeOut(dict, later_call = later):
158             if later_call.active():
159                 later_call.cancel()
160             return dict
161         d.addBoth(dropTimeOut)
162         self.transport.write(str, self.addr)
163         return d
164  
165 def connectionForAddr(host, port):
166     return host
167     
168 class Receiver(protocol.Factory):
169     protocol = KRPC
170     def __init__(self):
171         self.buf = []
172     def krpc_store(self, msg, _krpc_sender):
173         self.buf += [msg]
174     def krpc_echo(self, msg, _krpc_sender):
175         return msg
176
177 def make(port):
178     af = Receiver()
179     a = hostbroker(af)
180     a.protocol = KRPC
181     p = reactor.listenUDP(port, a)
182     return af, a, p
183     
184 class KRPCTests(unittest.TestCase):
185     def setUp(self):
186         KRPC.noisy = 0
187         self.af, self.a, self.ap = make(1180)
188         self.bf, self.b, self.bp = make(1181)
189
190     def tearDown(self):
191         self.ap.stopListening()
192         self.bp.stopListening()
193
194     def bufEquals(self, result, value):
195         self.assertEqual(self.bf.buf, value)
196
197     def testSimpleMessage(self):
198         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
199         d.addCallback(self.bufEquals, ["This is a test."])
200         return d
201
202     def testMessageBlast(self):
203         for i in range(100):
204             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
205         d.addCallback(self.bufEquals, ["This is a test."] * 100)
206         return d
207
208     def testEcho(self):
209         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
210         df.addCallback(self.gotMsg, "This is a test.")
211         return df
212
213     def gotMsg(self, dict, should_be):
214         _krpc_sender = dict['_krpc_sender']
215         msg = dict['rsp']
216         self.assertEqual(msg, should_be)
217
218     def testManyEcho(self):
219         for i in xrange(100):
220             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
221             df.addCallback(self.gotMsg, "This is a test.")
222         return df
223
224     def testMultiEcho(self):
225         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
226         df.addCallback(self.gotMsg, "This is a test.")
227
228         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
229         df.addCallback(self.gotMsg, "This is another test.")
230
231         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
232         df.addCallback(self.gotMsg, "This is yet another test.")
233         
234         return df
235
236     def testEchoReset(self):
237         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
238         df.addCallback(self.gotMsg, "This is a test.")
239
240         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
241         df.addCallback(self.gotMsg, "This is another test.")
242         df.addCallback(self.echoReset)
243         return df
244     
245     def echoReset(self, dict):
246         del(self.a.connections[('127.0.0.1', 1181)])
247         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
248         df.addCallback(self.gotMsg, "This is yet another test.")
249         return df
250
251     def testUnknownMeth(self):
252         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
253         df.addErrback(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
254         return df
255
256     def gotErr(self, err, should_be):
257         self.assertEqual(err.value, should_be)