]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - krpc.py
major cleanup, updated for twisted
[quix0rs-apt-p2p.git] / krpc.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 import airhook
5 from twisted.internet.defer import Deferred
6 from twisted.protocols import basic
7 from bencode import bencode, bdecode
8 from twisted.internet import protocol
9
10 from twisted.internet import reactor
11 import time
12
13 import khash as hash
14
15 KRPC_TIMEOUT = 60
16
17 KRPC_ERROR = 1
18 KRPC_ERROR_METHOD_UNKNOWN = 2
19 KRPC_ERROR_RECEIVED_UNKNOWN = 3
20 KRPC_ERROR_TIMEOUT = 4
21
22 # commands
23 TID = 'tid'
24 REQ = 'req'
25 RSP = 'rsp'
26 TYP = 'typ'
27 ARG = 'arg'
28 ERR = 'err'
29
30 class hostbroker(protocol.DatagramProtocol):       
31     def __init__(self, server):
32         self.noisy = 0
33         self.server = server
34         # this should be changed to storage that drops old entries
35         self.connections = {}
36         
37     def datagramReceived(self, datagram, addr):
38         #print `addr`, `datagram`
39         #if addr != self.addr:
40         c = self.connectionForAddr(addr)
41         c.datagramReceived(datagram)
42         #if c.idle():
43         #    del self.connections[addr]
44
45     def connectionForAddr(self, addr):
46         if addr == self.addr:
47             raise Exception
48         if not self.connections.has_key(addr):
49             conn = self.protocol(addr, self.server, self.transport)
50             self.connections[addr] = conn
51         else:
52             conn = self.connections[addr]
53         return conn
54
55     def makeConnection(self, transport):
56         protocol.DatagramProtocol.makeConnection(self, transport)
57         tup = transport.getHost()
58         self.addr = (tup.host, tup.port)
59
60 ## connection
61 class KRPC:
62     noisy = 1
63     def __init__(self, addr, server, transport):
64         self.transport = transport
65         self.factory = server
66         self.addr = addr
67         self.tids = {}
68
69     def datagramReceived(self, str):
70         # bdecode
71         try:
72             msg = bdecode(str)
73         except Exception, e:
74             if self.noisy:
75                 print "response decode error: " + `e`
76             self.d.errback()
77         else:
78             #if self.noisy:
79             #    print msg
80             # look at msg type
81             if msg[TYP]  == REQ:
82                 ilen = len(str)
83                 # if request
84                 #       tell factory to handle
85                 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
86                 if f and callable(f):
87                     msg[ARG]['_krpc_sender'] =  self.addr
88                     try:
89                         ret = apply(f, (), msg[ARG])
90                     except Exception, e:
91                         ## send error
92                         out = bencode({TID:msg[TID], TYP:ERR, ERR :`e`})
93                         olen = len(out)
94                         self.transport.write(out, self.addr)
95                     else:
96                         if ret:
97                             #   make response
98                             out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
99                         else:
100                             out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
101                         #       send response
102                         olen = len(out)
103                         self.transport.write(out, self.addr)
104
105                 else:
106                     if self.noisy:
107                         print "don't know about method %s" % msg[REQ]
108                     # unknown method
109                     out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
110                     olen = len(out)
111                     self.transport.write(out, self.addr)
112                 if self.noisy:
113                     print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.addr, self.factory.node.port, 
114                                                     ilen, msg[REQ], olen)
115             elif msg[TYP] == RSP:
116                 # if response
117                 #       lookup tid
118                 if self.tids.has_key(msg[TID]):
119                     df = self.tids[msg[TID]]
120                     #   callback
121                     del(self.tids[msg[TID]])
122                     df.callback({RSP : msg[RSP], '_krpc_sender': self.addr})
123                 else:
124                     print 'timeout ' + `msg[RSP]['sender']`
125                     # no tid, this transaction timed out already...
126             elif msg[TYP] == ERR:
127                 # if error
128                 #       lookup tid
129                 if self.tids.has_key(msg[TID]):
130                     df = self.tids[msg[TID]]
131                     #   callback
132                     df.errback(msg[ERR])
133                     del(self.tids[msg[TID]])
134                 else:
135                     # day late and dollar short
136                     pass
137             else:
138                 print "unknown message type " + `msg`
139                 # unknown message type
140                 df = self.tids[msg[TID]]
141                 #       callback
142                 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
143                 del(self.tids[msg[TID]])
144                 
145     def sendRequest(self, method, args):
146         # make message
147         # send it
148         msg = {TID : hash.newTID(), TYP : REQ,  REQ : method, ARG : args}
149         str = bencode(msg)
150         d = Deferred()
151         self.tids[msg[TID]] = d
152         def timeOut(tids = self.tids, id = msg[TID]):
153             if tids.has_key(id):
154                 df = tids[id]
155                 del(tids[id])
156                 print ">>>>>> KRPC_ERROR_TIMEOUT"
157                 df.errback(KRPC_ERROR_TIMEOUT)
158         reactor.callLater(KRPC_TIMEOUT, timeOut)
159         self.transport.write(str, self.addr)
160         return d
161