]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - krpc.py
*** empty log message ***
[quix0rs-apt-p2p.git] / krpc.py
1 import airhook
2 from twisted.internet.defer import Deferred
3 from twisted.protocols import basic
4 from bencode import bencode, bdecode
5 from twisted.internet import reactor
6 import time
7
8 import hash
9
10 KRPC_TIMEOUT = 60
11
12 KRPC_ERROR = 1
13 KRPC_ERROR_METHOD_UNKNOWN = 2
14 KRPC_ERROR_RECEIVED_UNKNOWN = 3
15 KRPC_ERROR_TIMEOUT = 4
16
17 class KRPC(basic.NetstringReceiver):
18     noisy = 1
19     def __init__(self):
20         self.tids = {}
21
22
23     def dataRecieved(self, data):
24         basic.NetstringReceiver(self, data)
25         if self.brokenPeer:
26             self.resetConnection()
27             
28     def resetConnection(self):
29         self.brokenPeer = 0
30         self._readerState = basic.LENGTH
31         self._readerLength = 0
32
33     def stringReceived(self, str):
34         # bdecode
35         try:
36             msg = bdecode(str)
37         except Exception, e:
38             if self.naisy:
39                 print "response decode error: " + `e`
40             self.d.errback()
41         else:
42             # look at msg type
43             if msg['typ']  == 'req':
44                 ilen = len(str)
45                 # if request
46                 #       tell factory to handle
47                 f = getattr(self.factory ,"krpc_" + msg['req'], None)
48                 if f and callable(f):
49                     msg['arg']['_krpc_sender'] =  self.transport.addr
50                     try:
51                         ret = apply(f, (), msg['arg'])
52                     except Exception, e:
53                         ## send error
54                         out = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`})
55                         olen = len(out)
56                         self.sendString(out)
57                     else:
58                         if ret:
59                             #   make response
60                             out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
61                         else:
62                             out = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}})
63                         #       send response
64                         olen = len(out)
65                         self.sendString(out)
66
67                 else:
68                     if self.noisy:
69                         print "don't know about method %s" % msg['req']
70                     # unknown method
71                     out = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
72                     olen = len(out)
73                     self.sendString(out)
74                 if self.noisy:
75                     print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.transport.addr, self.factory.node.port, 
76                                                     ilen, msg['req'], olen)
77             elif msg['typ'] == 'rsp':
78                 # if response
79                 #       lookup tid
80                 if self.tids.has_key(msg['tid']):
81                     df = self.tids[msg['tid']]
82                     #   callback
83                     del(self.tids[msg['tid']])
84                     df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr})
85                 else:
86                     print 'timeout ' + `msg['rsp']['sender']`
87                     # no tid, this transaction timed out already...
88             elif msg['typ'] == 'err':
89                 # if error
90                 #       lookup tid
91                 df = self.tids[msg['tid']]
92                 #       callback
93                 df.errback(msg['err'])
94                 del(self.tids[msg['tid']])
95             else:
96                 print "unknown message type " + `msg`
97                 # unknown message type
98                 df = self.tids[msg['tid']]
99                 #       callback
100                 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
101                 del(self.tids[msg['tid']])
102                 
103     def sendRequest(self, method, args):
104         # make message
105         # send it
106         msg = {'tid' : hash.newID(), 'typ' : 'req',  'req' : method, 'arg' : args}
107         str = bencode(msg)
108         d = Deferred()
109         self.tids[msg['tid']] = d
110         def timeOut(tids = self.tids, id = msg['tid']):
111             if tids.has_key(id):
112                 df = tids[id]
113                 del(tids[id])
114                 print ">>>>>> KRPC_ERROR_TIMEOUT"
115                 df.errback(KRPC_ERROR_TIMEOUT)
116         reactor.callLater(KRPC_TIMEOUT, timeOut)
117         self.sendString(str)
118         return d
119