Added more debug printing to noisy krpc protocol.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / krpc.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from bencode import bencode, bdecode
5 from time import asctime
6 import sys
7 from traceback import format_exception
8
9 from twisted.internet.defer import Deferred
10 from twisted.internet import protocol, reactor
11 from twisted.trial import unittest
12
13 KRPC_TIMEOUT = 20
14
15 KRPC_ERROR = 1
16 KRPC_ERROR_METHOD_UNKNOWN = 2
17 KRPC_ERROR_RECEIVED_UNKNOWN = 3
18 KRPC_ERROR_TIMEOUT = 4
19
20 # commands
21 TID = 't'
22 REQ = 'q'
23 RSP = 'r'
24 TYP = 'y'
25 ARG = 'a'
26 ERR = 'e'
27
28 class ProtocolError(Exception):
29     pass
30
31 class hostbroker(protocol.DatagramProtocol):       
32     def __init__(self, server):
33         self.server = server
34         # this should be changed to storage that drops old entries
35         self.connections = {}
36         
37     def datagramReceived(self, datagram, addr):
38         #print `addr`, `datagram`
39         #if addr != self.addr:
40         c = self.connectionForAddr(addr)
41         c.datagramReceived(datagram, addr)
42         #if c.idle():
43         #    del self.connections[addr]
44
45     def connectionForAddr(self, addr):
46         if addr == self.addr:
47             raise Exception
48         if not self.connections.has_key(addr):
49             conn = self.protocol(addr, self.server, self.transport)
50             self.connections[addr] = conn
51         else:
52             conn = self.connections[addr]
53         return conn
54
55     def makeConnection(self, transport):
56         protocol.DatagramProtocol.makeConnection(self, transport)
57         tup = transport.getHost()
58         self.addr = (tup.host, tup.port)
59         
60     def stopProtocol(self):
61         for conn in self.connections.values():
62             conn.stop()
63         protocol.DatagramProtocol.stopProtocol(self)
64
65 ## connection
66 class KRPC:
67     noisy = 0
68     def __init__(self, addr, server, transport):
69         self.transport = transport
70         self.factory = server
71         self.addr = addr
72         self.tids = {}
73         self.mtid = 0
74         self.stopped = False
75
76     def datagramReceived(self, str, addr):
77         if self.stopped:
78             if self.noisy:
79                 print "stopped, dropping message from", addr, str
80         # bdecode
81         try:
82             msg = bdecode(str)
83         except Exception, e:
84             if self.noisy:
85                 print "response decode error: " + `e`
86         else:
87             if self.noisy:
88                 print self.factory.port, "received from", addr, self.addr, ":", msg
89             # look at msg type
90             if msg[TYP]  == REQ:
91                 ilen = len(str)
92                 # if request
93                 #       tell factory to handle
94                 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
95                 msg[ARG]['_krpc_sender'] =  self.addr
96                 if f and callable(f):
97                     try:
98                         ret = apply(f, (), msg[ARG])
99                     except Exception, e:
100                         ## send error
101                         out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
102                         olen = len(out)
103                         if self.noisy:
104                             print self.factory.port, "responding to", addr, self.addr, ":", out
105                         self.transport.write(out, addr)
106                     else:
107                         if ret:
108                             #   make response
109                             out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
110                         else:
111                             out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
112                         #       send response
113                         olen = len(out)
114                         if self.noisy:
115                             print self.factory.port, "responding to", addr, self.addr, ":", out
116                         self.transport.write(out, addr)
117
118                 else:
119                     if self.noisy:
120                         print "don't know about method %s" % msg[REQ]
121                     # unknown method
122                     out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
123                     olen = len(out)
124                     if self.noisy:
125                         print self.factory.port, "responding to", addr, self.addr, ":", out
126                     self.transport.write(out, addr)
127                 if self.noisy:
128                     print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port, 
129                                                     ilen, msg[REQ], olen)
130             elif msg[TYP] == RSP:
131                 # if response
132                 #       lookup tid
133                 if self.tids.has_key(msg[TID]):
134                     df = self.tids[msg[TID]]
135                     #   callback
136                     del(self.tids[msg[TID]])
137                     df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
138                 else:
139                     print 'timeout ' + `msg[RSP]['id']`
140                     # no tid, this transaction timed out already...
141             elif msg[TYP] == ERR:
142                 # if error
143                 #       lookup tid
144                 if self.tids.has_key(msg[TID]):
145                     df = self.tids[msg[TID]]
146                     #   callback
147                     df.errback(msg[ERR])
148                     del(self.tids[msg[TID]])
149                 else:
150                     # day late and dollar short
151                     pass
152             else:
153                 print "unknown message type " + `msg`
154                 # unknown message type
155                 df = self.tids[msg[TID]]
156                 #       callback
157                 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
158                 del(self.tids[msg[TID]])
159                 
160     def sendRequest(self, method, args):
161         if self.stopped:
162             raise ProtocolError, "connection has been stopped"
163         # make message
164         # send it
165         msg = {TID : chr(self.mtid), TYP : REQ,  REQ : method, ARG : args}
166         self.mtid = (self.mtid + 1) % 256
167         if self.noisy:
168             print self.factory.port, "sending to", self.addr, ":", msg
169         str = bencode(msg)
170         d = Deferred()
171         self.tids[msg[TID]] = d
172         def timeOut(tids = self.tids, id = msg[TID], msg = msg):
173             if tids.has_key(id):
174                 df = tids[id]
175                 del(tids[id])
176                 print ">>>>>> KRPC_ERROR_TIMEOUT"
177                 df.errback(ProtocolError('timeout waiting for %r' % msg))
178         later = reactor.callLater(KRPC_TIMEOUT, timeOut)
179         def dropTimeOut(dict, later_call = later):
180             if later_call.active():
181                 later_call.cancel()
182             return dict
183         d.addBoth(dropTimeOut)
184         self.transport.write(str, self.addr)
185         return d
186     
187     def stop(self):
188         """Timeout all pending requests."""
189         for df in self.tids.values():
190             df.errback(ProtocolError('connection has been closed'))
191         self.tids = {}
192         self.stopped = True
193  
194 def connectionForAddr(host, port):
195     return host
196     
197 class Receiver(protocol.Factory):
198     protocol = KRPC
199     def __init__(self):
200         self.buf = []
201     def krpc_store(self, msg, _krpc_sender):
202         self.buf += [msg]
203     def krpc_echo(self, msg, _krpc_sender):
204         return msg
205
206 def make(port):
207     af = Receiver()
208     a = hostbroker(af)
209     a.protocol = KRPC
210     p = reactor.listenUDP(port, a)
211     return af, a, p
212     
213 class KRPCTests(unittest.TestCase):
214     def setUp(self):
215         KRPC.noisy = 0
216         self.af, self.a, self.ap = make(1180)
217         self.bf, self.b, self.bp = make(1181)
218
219     def tearDown(self):
220         self.ap.stopListening()
221         self.bp.stopListening()
222
223     def bufEquals(self, result, value):
224         self.assertEqual(self.bf.buf, value)
225
226     def testSimpleMessage(self):
227         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
228         d.addCallback(self.bufEquals, ["This is a test."])
229         return d
230
231     def testMessageBlast(self):
232         for i in range(100):
233             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
234         d.addCallback(self.bufEquals, ["This is a test."] * 100)
235         return d
236
237     def testEcho(self):
238         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
239         df.addCallback(self.gotMsg, "This is a test.")
240         return df
241
242     def gotMsg(self, dict, should_be):
243         _krpc_sender = dict['_krpc_sender']
244         msg = dict['rsp']
245         self.assertEqual(msg, should_be)
246
247     def testManyEcho(self):
248         for i in xrange(100):
249             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
250             df.addCallback(self.gotMsg, "This is a test.")
251         return df
252
253     def testMultiEcho(self):
254         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
255         df.addCallback(self.gotMsg, "This is a test.")
256
257         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
258         df.addCallback(self.gotMsg, "This is another test.")
259
260         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
261         df.addCallback(self.gotMsg, "This is yet another test.")
262         
263         return df
264
265     def testEchoReset(self):
266         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
267         df.addCallback(self.gotMsg, "This is a test.")
268
269         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
270         df.addCallback(self.gotMsg, "This is another test.")
271         df.addCallback(self.echoReset)
272         return df
273     
274     def echoReset(self, dict):
275         del(self.a.connections[('127.0.0.1', 1181)])
276         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
277         df.addCallback(self.gotMsg, "This is yet another test.")
278         return df
279
280     def testUnknownMeth(self):
281         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
282         df.addErrback(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
283         return df
284
285     def gotErr(self, err, should_be):
286         self.assertEqual(err.value, should_be)