+++ /dev/null
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
-
-from bencode import bencode, bdecode
-from time import asctime
-import sys
-from traceback import format_exception
-
-from twisted.internet.defer import Deferred
-from twisted.internet import protocol
-from twisted.internet import reactor
-
-KRPC_TIMEOUT = 20
-
-KRPC_ERROR = 1
-KRPC_ERROR_METHOD_UNKNOWN = 2
-KRPC_ERROR_RECEIVED_UNKNOWN = 3
-KRPC_ERROR_TIMEOUT = 4
-
-# commands
-TID = 't'
-REQ = 'q'
-RSP = 'r'
-TYP = 'y'
-ARG = 'a'
-ERR = 'e'
-
-class hostbroker(protocol.DatagramProtocol):
- def __init__(self, server):
- self.server = server
- # this should be changed to storage that drops old entries
- self.connections = {}
-
- def datagramReceived(self, datagram, addr):
- #print `addr`, `datagram`
- #if addr != self.addr:
- c = self.connectionForAddr(addr)
- c.datagramReceived(datagram, addr)
- #if c.idle():
- # del self.connections[addr]
-
- def connectionForAddr(self, addr):
- if addr == self.addr:
- raise Exception
- if not self.connections.has_key(addr):
- conn = self.protocol(addr, self.server, self.transport)
- self.connections[addr] = conn
- else:
- conn = self.connections[addr]
- return conn
-
- def makeConnection(self, transport):
- protocol.DatagramProtocol.makeConnection(self, transport)
- tup = transport.getHost()
- self.addr = (tup.host, tup.port)
-
-## connection
-class KRPC:
- noisy = 1
- def __init__(self, addr, server, transport):
- self.transport = transport
- self.factory = server
- self.addr = addr
- self.tids = {}
- self.mtid = 0
-
- def datagramReceived(self, str, addr):
- # bdecode
- try:
- msg = bdecode(str)
- except Exception, e:
- if self.noisy:
- print "response decode error: " + `e`
- else:
- #if self.noisy:
- # print msg
- # look at msg type
- if msg[TYP] == REQ:
- ilen = len(str)
- # if request
- # tell factory to handle
- f = getattr(self.factory ,"krpc_" + msg[REQ], None)
- msg[ARG]['_krpc_sender'] = self.addr
- if f and callable(f):
- try:
- ret = apply(f, (), msg[ARG])
- except Exception, e:
- ## send error
- out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
- olen = len(out)
- self.transport.write(out, addr)
- else:
- if ret:
- # make response
- out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
- else:
- out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
- # send response
- olen = len(out)
- self.transport.write(out, addr)
-
- else:
- if self.noisy:
- print "don't know about method %s" % msg[REQ]
- # unknown method
- out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
- olen = len(out)
- self.transport.write(out, addr)
- if self.noisy:
- print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port,
- ilen, msg[REQ], olen)
- elif msg[TYP] == RSP:
- # if response
- # lookup tid
- if self.tids.has_key(msg[TID]):
- df = self.tids[msg[TID]]
- # callback
- del(self.tids[msg[TID]])
- df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
- else:
- print 'timeout ' + `msg[RSP]['id']`
- # no tid, this transaction timed out already...
- elif msg[TYP] == ERR:
- # if error
- # lookup tid
- if self.tids.has_key(msg[TID]):
- df = self.tids[msg[TID]]
- # callback
- df.errback(msg[ERR])
- del(self.tids[msg[TID]])
- else:
- # day late and dollar short
- pass
- else:
- print "unknown message type " + `msg`
- # unknown message type
- df = self.tids[msg[TID]]
- # callback
- df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
- del(self.tids[msg[TID]])
-
- def sendRequest(self, method, args):
- # make message
- # send it
- msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args}
- self.mtid = (self.mtid + 1) % 256
- str = bencode(msg)
- d = Deferred()
- self.tids[msg[TID]] = d
- def timeOut(tids = self.tids, id = msg[TID]):
- if tids.has_key(id):
- df = tids[id]
- del(tids[id])
- print ">>>>>> KRPC_ERROR_TIMEOUT"
- df.errback(KRPC_ERROR_TIMEOUT)
- reactor.callLater(KRPC_TIMEOUT, timeOut)
- self.transport.write(str, self.addr)
- return d
-