Change all print statements to log.msg calls.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / krpc.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from bencode import bencode, bdecode
5 from time import asctime
6 import sys
7 from traceback import format_exception
8
9 from twisted.internet.defer import Deferred
10 from twisted.internet import protocol, reactor
11 from twisted.python import log
12 from twisted.trial import unittest
13
14 from khash import newID
15
16 KRPC_TIMEOUT = 20
17
18 KRPC_ERROR = 1
19 KRPC_ERROR_METHOD_UNKNOWN = 2
20 KRPC_ERROR_RECEIVED_UNKNOWN = 3
21 KRPC_ERROR_TIMEOUT = 4
22
23 # commands
24 TID = 't'
25 REQ = 'q'
26 RSP = 'r'
27 TYP = 'y'
28 ARG = 'a'
29 ERR = 'e'
30
31 class ProtocolError(Exception):
32     pass
33
34 class hostbroker(protocol.DatagramProtocol):       
35     def __init__(self, server, config):
36         self.server = server
37         self.config = config
38         # this should be changed to storage that drops old entries
39         self.connections = {}
40         
41     def datagramReceived(self, datagram, addr):
42         #print `addr`, `datagram`
43         #if addr != self.addr:
44         c = self.connectionForAddr(addr)
45         c.datagramReceived(datagram, addr)
46         #if c.idle():
47         #    del self.connections[addr]
48
49     def connectionForAddr(self, addr):
50         if addr == self.addr:
51             raise Exception
52         if not self.connections.has_key(addr):
53             conn = self.protocol(addr, self.server, self.transport, self.config['SPEW'])
54             self.connections[addr] = conn
55         else:
56             conn = self.connections[addr]
57         return conn
58
59     def makeConnection(self, transport):
60         protocol.DatagramProtocol.makeConnection(self, transport)
61         tup = transport.getHost()
62         self.addr = (tup.host, tup.port)
63         
64     def stopProtocol(self):
65         for conn in self.connections.values():
66             conn.stop()
67         protocol.DatagramProtocol.stopProtocol(self)
68
69 ## connection
70 class KRPC:
71     def __init__(self, addr, server, transport, spew = False):
72         self.transport = transport
73         self.factory = server
74         self.addr = addr
75         self.noisy = spew
76         self.tids = {}
77         self.stopped = False
78
79     def datagramReceived(self, str, addr):
80         if self.stopped:
81             if self.noisy:
82                 log.msg("stopped, dropping message from %r: %s" % (addr, str))
83         # bdecode
84         try:
85             msg = bdecode(str)
86         except Exception, e:
87             if self.noisy:
88                 log.msg("response decode error: ")
89                 log.err(e)
90         else:
91             if self.noisy:
92                 log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
93             # look at msg type
94             if msg[TYP]  == REQ:
95                 ilen = len(str)
96                 # if request
97                 #       tell factory to handle
98                 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
99                 msg[ARG]['_krpc_sender'] =  self.addr
100                 if f and callable(f):
101                     try:
102                         ret = f(*(), **msg[ARG])
103                     except Exception, e:
104                         olen = self._sendResponse(addr, msg[TID], ERR, `format_exception(type(e), e, sys.exc_info()[2])`)
105                     else:
106                         olen = self._sendResponse(addr, msg[TID], RSP, ret)
107                 else:
108                     if self.noisy:
109                         log.msg("don't know about method %s" % msg[REQ])
110                     # unknown method
111                     olen = self._sendResponse(addr, msg[TID], ERR, KRPC_ERROR_METHOD_UNKNOWN)
112                 if self.noisy:
113                     log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
114                                                       ilen, msg[REQ], olen))
115             elif msg[TYP] == RSP:
116                 # if response
117                 #       lookup tid
118                 if self.tids.has_key(msg[TID]):
119                     df = self.tids[msg[TID]]
120                     #   callback
121                     del(self.tids[msg[TID]])
122                     df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
123                 else:
124                     # no tid, this transaction timed out already...
125                     if self.noisy:
126                         log.msg('timeout: %r' % msg[RSP]['id'])
127             elif msg[TYP] == ERR:
128                 # if error
129                 #       lookup tid
130                 if self.tids.has_key(msg[TID]):
131                     df = self.tids[msg[TID]]
132                     #   callback
133                     df.errback(msg[ERR])
134                     del(self.tids[msg[TID]])
135                 else:
136                     # day late and dollar short
137                     pass
138             else:
139                 if self.noisy:
140                     log.msg("unknown message type: %r" % msg)
141                 # unknown message type
142                 df = self.tids[msg[TID]]
143                 #       callback
144                 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
145                 del(self.tids[msg[TID]])
146                 
147     def _sendResponse(self, addr, tid, msgType, response):
148         if not response:
149             response = {}
150             
151         msg = {TID : tid, TYP : msgType, msgType : response}
152
153         if self.noisy:
154             log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
155
156         out = bencode(msg)
157         self.transport.write(out, addr)
158         return len(out)
159     
160     def sendRequest(self, method, args):
161         if self.stopped:
162             raise ProtocolError, "connection has been stopped"
163         # make message
164         # send it
165         msg = {TID : newID(), TYP : REQ,  REQ : method, ARG : args}
166         if self.noisy:
167             log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
168         str = bencode(msg)
169         d = Deferred()
170         self.tids[msg[TID]] = d
171         def timeOut(tids = self.tids, id = msg[TID], msg = msg):
172             if tids.has_key(id):
173                 df = tids[id]
174                 del(tids[id])
175                 log.msg(">>>>>> KRPC_ERROR_TIMEOUT")
176                 df.errback(ProtocolError('timeout waiting for %r' % msg))
177         later = reactor.callLater(KRPC_TIMEOUT, timeOut)
178         def dropTimeOut(dict, later_call = later):
179             if later_call.active():
180                 later_call.cancel()
181             return dict
182         d.addBoth(dropTimeOut)
183         self.transport.write(str, self.addr)
184         return d
185     
186     def stop(self):
187         """Timeout all pending requests."""
188         for df in self.tids.values():
189             df.errback(ProtocolError('connection has been closed'))
190         self.tids = {}
191         self.stopped = True
192  
193 def connectionForAddr(host, port):
194     return host
195     
196 class Receiver(protocol.Factory):
197     protocol = KRPC
198     def __init__(self):
199         self.buf = []
200     def krpc_store(self, msg, _krpc_sender):
201         self.buf += [msg]
202     def krpc_echo(self, msg, _krpc_sender):
203         return msg
204
205 def make(port):
206     af = Receiver()
207     a = hostbroker(af, {'SPEW': False})
208     a.protocol = KRPC
209     p = reactor.listenUDP(port, a)
210     return af, a, p
211     
212 class KRPCTests(unittest.TestCase):
213     def setUp(self):
214         self.af, self.a, self.ap = make(1180)
215         self.bf, self.b, self.bp = make(1181)
216
217     def tearDown(self):
218         self.ap.stopListening()
219         self.bp.stopListening()
220
221     def bufEquals(self, result, value):
222         self.failUnlessEqual(self.bf.buf, value)
223
224     def testSimpleMessage(self):
225         d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
226         d.addCallback(self.bufEquals, ["This is a test."])
227         return d
228
229     def testMessageBlast(self):
230         for i in range(100):
231             d = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('store', {'msg' : "This is a test."})
232         d.addCallback(self.bufEquals, ["This is a test."] * 100)
233         return d
234
235     def testEcho(self):
236         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
237         df.addCallback(self.gotMsg, "This is a test.")
238         return df
239
240     def gotMsg(self, dict, should_be):
241         _krpc_sender = dict['_krpc_sender']
242         msg = dict['rsp']
243         self.failUnlessEqual(msg, should_be)
244
245     def testManyEcho(self):
246         for i in xrange(100):
247             df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
248             df.addCallback(self.gotMsg, "This is a test.")
249         return df
250
251     def testMultiEcho(self):
252         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
253         df.addCallback(self.gotMsg, "This is a test.")
254
255         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
256         df.addCallback(self.gotMsg, "This is another test.")
257
258         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
259         df.addCallback(self.gotMsg, "This is yet another test.")
260         
261         return df
262
263     def testEchoReset(self):
264         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is a test."})
265         df.addCallback(self.gotMsg, "This is a test.")
266
267         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is another test."})
268         df.addCallback(self.gotMsg, "This is another test.")
269         df.addCallback(self.echoReset)
270         return df
271     
272     def echoReset(self, dict):
273         del(self.a.connections[('127.0.0.1', 1181)])
274         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('echo', {'msg' : "This is yet another test."})
275         df.addCallback(self.gotMsg, "This is yet another test.")
276         return df
277
278     def testUnknownMeth(self):
279         df = self.a.connectionForAddr(('127.0.0.1', 1181)).sendRequest('blahblah', {'msg' : "This is a test."})
280         df.addErrback(self.gotErr, KRPC_ERROR_METHOD_UNKNOWN)
281         return df
282
283     def gotErr(self, err, should_be):
284         self.failUnlessEqual(err.value, should_be)