]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - krpc.py
efdc29ffe47bc2c9ba9fd66528444b04f56256dc
[quix0rs-apt-p2p.git] / krpc.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 import airhook
5 from twisted.internet.defer import Deferred
6 from twisted.protocols import basic
7 from bencode import bencode, bdecode
8 from twisted.internet import protocol
9
10 from twisted.internet import reactor
11 import time
12
13 import sys
14 from traceback import format_exception
15
16 import khash as hash
17
18 KRPC_TIMEOUT = 20
19
20 KRPC_ERROR = 1
21 KRPC_ERROR_METHOD_UNKNOWN = 2
22 KRPC_ERROR_RECEIVED_UNKNOWN = 3
23 KRPC_ERROR_TIMEOUT = 4
24
25 # commands
26 TID = 't'
27 REQ = 'q'
28 RSP = 'r'
29 TYP = 'y'
30 ARG = 'a'
31 ERR = 'e'
32
33 class hostbroker(protocol.DatagramProtocol):       
34     def __init__(self, server):
35         self.server = server
36         # this should be changed to storage that drops old entries
37         self.connections = {}
38         
39     def datagramReceived(self, datagram, addr):
40         #print `addr`, `datagram`
41         #if addr != self.addr:
42         c = self.connectionForAddr(addr)
43         c.datagramReceived(datagram, addr)
44         #if c.idle():
45         #    del self.connections[addr]
46
47     def connectionForAddr(self, addr):
48         if addr == self.addr:
49             raise Exception
50         if not self.connections.has_key(addr):
51             conn = self.protocol(addr, self.server, self.transport)
52             self.connections[addr] = conn
53         else:
54             conn = self.connections[addr]
55         return conn
56
57     def makeConnection(self, transport):
58         protocol.DatagramProtocol.makeConnection(self, transport)
59         tup = transport.getHost()
60         self.addr = (tup.host, tup.port)
61
62 ## connection
63 class KRPC:
64     noisy = 1
65     def __init__(self, addr, server, transport):
66         self.transport = transport
67         self.factory = server
68         self.addr = addr
69         self.tids = {}
70         self.mtid = 0
71
72     def datagramReceived(self, str, addr):
73         # bdecode
74         try:
75             msg = bdecode(str)
76         except Exception, e:
77             if self.noisy:
78                 print "response decode error: " + `e`
79             self.d.errback()
80         else:
81             #if self.noisy:
82             #    print msg
83             # look at msg type
84             if msg[TYP]  == REQ:
85                 ilen = len(str)
86                 # if request
87                 #       tell factory to handle
88                 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
89                 msg[ARG]['_krpc_sender'] =  self.addr
90                 if f and callable(f):
91                     try:
92                         ret = apply(f, (), msg[ARG])
93                     except Exception, e:
94                         ## send error
95                         out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
96                         olen = len(out)
97                         self.transport.write(out, addr)
98                     else:
99                         if ret:
100                             #   make response
101                             out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
102                         else:
103                             out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
104                         #       send response
105                         olen = len(out)
106                         self.transport.write(out, addr)
107
108                 else:
109                     if self.noisy:
110                         print "don't know about method %s" % msg[REQ]
111                     # unknown method
112                     out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
113                     olen = len(out)
114                     self.transport.write(out, addr)
115                 if self.noisy:
116                     print "%s %s >>> %s - %s %s %s" % (time.asctime(), addr, self.factory.node.port, 
117                                                     ilen, msg[REQ], olen)
118             elif msg[TYP] == RSP:
119                 # if response
120                 #       lookup tid
121                 if self.tids.has_key(msg[TID]):
122                     df = self.tids[msg[TID]]
123                     #   callback
124                     del(self.tids[msg[TID]])
125                     df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
126                 else:
127                     print 'timeout ' + `msg[RSP]['id']`
128                     # no tid, this transaction timed out already...
129             elif msg[TYP] == ERR:
130                 # if error
131                 #       lookup tid
132                 if self.tids.has_key(msg[TID]):
133                     df = self.tids[msg[TID]]
134                     #   callback
135                     df.errback(msg[ERR])
136                     del(self.tids[msg[TID]])
137                 else:
138                     # day late and dollar short
139                     pass
140             else:
141                 print "unknown message type " + `msg`
142                 # unknown message type
143                 df = self.tids[msg[TID]]
144                 #       callback
145                 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
146                 del(self.tids[msg[TID]])
147                 
148     def sendRequest(self, method, args):
149         # make message
150         # send it
151         msg = {TID : chr(self.mtid), TYP : REQ,  REQ : method, ARG : args}
152         self.mtid = (self.mtid + 1) % 256
153         str = bencode(msg)
154         d = Deferred()
155         self.tids[msg[TID]] = d
156         def timeOut(tids = self.tids, id = msg[TID]):
157             if tids.has_key(id):
158                 df = tids[id]
159                 del(tids[id])
160                 print ">>>>>> KRPC_ERROR_TIMEOUT"
161                 df.errback(KRPC_ERROR_TIMEOUT)
162         reactor.callLater(KRPC_TIMEOUT, timeOut)
163         self.transport.write(str, self.addr)
164         return d
165