1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
5 from twisted.internet.defer import Deferred
6 from twisted.protocols import basic
7 from bencode import bencode, bdecode
8 from twisted.internet import protocol
10 from twisted.internet import reactor
18 KRPC_ERROR_METHOD_UNKNOWN = 2
19 KRPC_ERROR_RECEIVED_UNKNOWN = 3
20 KRPC_ERROR_TIMEOUT = 4
30 class hostbroker(protocol.DatagramProtocol):
31 def __init__(self, server):
34 # this should be changed to storage that drops old entries
37 def datagramReceived(self, datagram, addr):
38 #print `addr`, `datagram`
39 #if addr != self.addr:
40 c = self.connectionForAddr(addr)
41 c.datagramReceived(datagram)
43 # del self.connections[addr]
45 def connectionForAddr(self, addr):
48 if not self.connections.has_key(addr):
49 conn = self.protocol(addr, self.server, self.transport)
50 self.connections[addr] = conn
52 conn = self.connections[addr]
55 def makeConnection(self, transport):
56 protocol.DatagramProtocol.makeConnection(self, transport)
57 tup = transport.getHost()
58 self.addr = (tup.host, tup.port)
63 def __init__(self, addr, server, transport):
64 self.transport = transport
69 def datagramReceived(self, str):
75 print "response decode error: " + `e`
84 # tell factory to handle
85 f = getattr(self.factory ,"krpc_" + msg[REQ], None)
87 msg[ARG]['_krpc_sender'] = self.addr
89 ret = apply(f, (), msg[ARG])
92 out = bencode({TID:msg[TID], TYP:ERR, ERR :`e`})
94 self.transport.write(out, self.addr)
98 out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
100 out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
103 self.transport.write(out, self.addr)
107 print "don't know about method %s" % msg[REQ]
109 out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
111 self.transport.write(out, self.addr)
113 print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.addr, self.factory.node.port,
114 ilen, msg[REQ], olen)
115 elif msg[TYP] == RSP:
118 if self.tids.has_key(msg[TID]):
119 df = self.tids[msg[TID]]
121 del(self.tids[msg[TID]])
122 df.callback({RSP : msg[RSP], '_krpc_sender': self.addr})
124 print 'timeout ' + `msg[RSP]['sender']`
125 # no tid, this transaction timed out already...
126 elif msg[TYP] == ERR:
129 if self.tids.has_key(msg[TID]):
130 df = self.tids[msg[TID]]
133 del(self.tids[msg[TID]])
135 # day late and dollar short
138 print "unknown message type " + `msg`
139 # unknown message type
140 df = self.tids[msg[TID]]
142 df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
143 del(self.tids[msg[TID]])
145 def sendRequest(self, method, args):
148 msg = {TID : hash.newTID(), TYP : REQ, REQ : method, ARG : args}
151 self.tids[msg[TID]] = d
152 def timeOut(tids = self.tids, id = msg[TID]):
156 print ">>>>>> KRPC_ERROR_TIMEOUT"
157 df.errback(KRPC_ERROR_TIMEOUT)
158 reactor.callLater(KRPC_TIMEOUT, timeOut)
159 self.transport.write(str, self.addr)