b4dd6e48b7660cbd391fc034797604cc226aeb1c
[quix0rs-apt-p2p.git] / krpc.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from twisted.internet.defer import Deferred
5 from twisted.protocols import basic
6 from bencode import bencode, bdecode
7 from twisted.internet import protocol
8
9 from twisted.internet import reactor
10 import time
11
12 import sys
13 from traceback import format_exception
14
15 import khash as hash
16
17 KRPC_TIMEOUT = 20
18
19 KRPC_ERROR = 1
20 KRPC_ERROR_METHOD_UNKNOWN = 2
21 KRPC_ERROR_RECEIVED_UNKNOWN = 3
22 KRPC_ERROR_TIMEOUT = 4
23
24 # commands
25 TID = 't'
26 REQ = 'q'
27 RSP = 'r'
28 TYP = 'y'
29 ARG = 'a'
30 ERR = 'e'
31
32 class hostbroker(protocol.DatagramProtocol):       
33     def __init__(self, server):
34         self.server = server
35         # this should be changed to storage that drops old entries
36         self.connections = {}
37         
38     def datagramReceived(self, datagram, addr):
39         #print `addr`, `datagram`
40         #if addr != self.addr:
41         c = self.connectionForAddr(addr)
42         c.datagramReceived(datagram, addr)
43         #if c.idle():
44         #    del self.connections[addr]
45
46     def connectionForAddr(self, addr):
47         if addr == self.addr:
48             raise Exception
49         if not self.connections.has_key(addr):
50             conn = self.protocol(addr, self.server, self.transport)
51             self.connections[addr] = conn
52         else:
53             conn = self.connections[addr]
54         return conn
55
56     def makeConnection(self, transport):
57         protocol.DatagramProtocol.makeConnection(self, transport)
58         tup = transport.getHost()
59         self.addr = (tup.host, tup.port)
60
61 ## connection
62 class KRPC:
63     noisy = 1
64     def __init__(self, addr, server, transport):
65         self.transport = transport
66         self.factory = server
67         self.addr = addr
68         self.tids = {}
69         self.mtid = 0
70
71     def datagramReceived(self, str, addr):
72         # bdecode
73         try:
74             msg = bdecode(str)
75         except Exception, e:
76             if self.noisy:
77                 print "response decode error: " + `e`
78         else:
79             #if self.noisy:
80             #    print msg
81             # look at msg type
82             if msg[TYP]  == REQ:
83                 ilen = len(str)
84                 # if request
85                 #       tell factory to handle
86                 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
87                 msg[ARG]['_krpc_sender'] =  self.addr
88                 if f and callable(f):
89                     try:
90                         ret = apply(f, (), msg[ARG])
91                     except Exception, e:
92                         ## send error
93                         out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
94                         olen = len(out)
95                         self.transport.write(out, addr)
96                     else:
97                         if ret:
98                             #   make response
99                             out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
100                         else:
101                             out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
102                         #       send response
103                         olen = len(out)
104                         self.transport.write(out, addr)
105
106                 else:
107                     if self.noisy:
108                         print "don't know about method %s" % msg[REQ]
109                     # unknown method
110                     out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
111                     olen = len(out)
112                     self.transport.write(out, addr)
113                 if self.noisy:
114                     print "%s %s >>> %s - %s %s %s" % (time.asctime(), addr, self.factory.node.port, 
115                                                     ilen, msg[REQ], olen)
116             elif msg[TYP] == RSP:
117                 # if response
118                 #       lookup tid
119                 if self.tids.has_key(msg[TID]):
120                     df = self.tids[msg[TID]]
121                     #   callback
122                     del(self.tids[msg[TID]])
123                     df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
124                 else:
125                     print 'timeout ' + `msg[RSP]['id']`
126                     # no tid, this transaction timed out already...
127             elif msg[TYP] == ERR:
128                 # if error
129                 #       lookup tid
130                 if self.tids.has_key(msg[TID]):
131                     df = self.tids[msg[TID]]
132                     #   callback
133                     df.errback(msg[ERR])
134                     del(self.tids[msg[TID]])
135                 else:
136                     # day late and dollar short
137                     pass
138             else:
139                 print "unknown message type " + `msg`
140                 # unknown message type
141                 df = self.tids[msg[TID]]
142                 #       callback
143                 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
144                 del(self.tids[msg[TID]])
145                 
146     def sendRequest(self, method, args):
147         # make message
148         # send it
149         msg = {TID : chr(self.mtid), TYP : REQ,  REQ : method, ARG : args}
150         self.mtid = (self.mtid + 1) % 256
151         str = bencode(msg)
152         d = Deferred()
153         self.tids[msg[TID]] = d
154         def timeOut(tids = self.tids, id = msg[TID]):
155             if tids.has_key(id):
156                 df = tids[id]
157                 del(tids[id])
158                 print ">>>>>> KRPC_ERROR_TIMEOUT"
159                 df.errback(KRPC_ERROR_TIMEOUT)
160         reactor.callLater(KRPC_TIMEOUT, timeOut)
161         self.transport.write(str, self.addr)
162         return d
163