-import airhook
-from twisted.internet.defer import Deferred
-from twisted.protocols import basic
+## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
+
from bencode import bencode, bdecode
-from twisted.internet import reactor
-import time
+from time import asctime
+import sys
+from traceback import format_exception
-import hash
+from twisted.internet.defer import Deferred
+from twisted.internet import protocol
+from twisted.internet import reactor
-KRPC_TIMEOUT = 30
+KRPC_TIMEOUT = 20
KRPC_ERROR = 1
KRPC_ERROR_METHOD_UNKNOWN = 2
KRPC_ERROR_RECEIVED_UNKNOWN = 3
KRPC_ERROR_TIMEOUT = 4
-class KRPC(basic.NetstringReceiver):
+# commands
+TID = 't'
+REQ = 'q'
+RSP = 'r'
+TYP = 'y'
+ARG = 'a'
+ERR = 'e'
+
+class hostbroker(protocol.DatagramProtocol):
+ def __init__(self, server):
+ self.server = server
+ # this should be changed to storage that drops old entries
+ self.connections = {}
+
+ def datagramReceived(self, datagram, addr):
+ #print `addr`, `datagram`
+ #if addr != self.addr:
+ c = self.connectionForAddr(addr)
+ c.datagramReceived(datagram, addr)
+ #if c.idle():
+ # del self.connections[addr]
+
+ def connectionForAddr(self, addr):
+ if addr == self.addr:
+ raise Exception
+ if not self.connections.has_key(addr):
+ conn = self.protocol(addr, self.server, self.transport)
+ self.connections[addr] = conn
+ else:
+ conn = self.connections[addr]
+ return conn
+
+ def makeConnection(self, transport):
+ protocol.DatagramProtocol.makeConnection(self, transport)
+ tup = transport.getHost()
+ self.addr = (tup.host, tup.port)
+
+## connection
+class KRPC:
noisy = 1
- def __init__(self):
+ def __init__(self, addr, server, transport):
+ self.transport = transport
+ self.factory = server
+ self.addr = addr
self.tids = {}
+ self.mtid = 0
- def resetConnection(self):
- self.brokenPeer = 0
- self._readerState = basic.LENGTH
- self._readerLength = 0
-
- def stringReceived(self, str):
+ def datagramReceived(self, str, addr):
# bdecode
try:
msg = bdecode(str)
except Exception, e:
- if self.naisy:
+ if self.noisy:
print "response decode error: " + `e`
- self.d.errback()
else:
+ #if self.noisy:
+ # print msg
# look at msg type
- if msg['typ'] == 'req':
+ if msg[TYP] == REQ:
ilen = len(str)
# if request
# tell factory to handle
- f = getattr(self.factory ,"krpc_" + msg['req'], None)
+ f = getattr(self.factory ,"krpc_" + msg[REQ], None)
+ msg[ARG]['_krpc_sender'] = self.addr
if f and callable(f):
- msg['arg']['_krpc_sender'] = self.transport.addr
try:
- ret = apply(f, (), msg['arg'])
+ ret = apply(f, (), msg[ARG])
except Exception, e:
## send error
- str = bencode({'tid':msg['tid'], 'typ':'err', 'err' :`e`})
- olen = len(str)
- self.sendString(str)
+ out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
+ olen = len(out)
+ self.transport.write(out, addr)
else:
if ret:
# make response
- str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : ret})
+ out = bencode({TID : msg[TID], TYP : RSP, RSP : ret})
else:
- str = bencode({'tid' : msg['tid'], 'typ' : 'rsp', 'rsp' : {}})
+ out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
# send response
- olen = len(str)
- self.sendString(str)
+ olen = len(out)
+ self.transport.write(out, addr)
else:
if self.noisy:
- print "don't know about method %s" % msg['req']
+ print "don't know about method %s" % msg[REQ]
# unknown method
- str = bencode({'tid':msg['tid'], 'typ':'err', 'err' : KRPC_ERROR_METHOD_UNKNOWN})
- olen = len(str)
- self.sendString(str)
+ out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
+ olen = len(out)
+ self.transport.write(out, addr)
if self.noisy:
- print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.transport.addr, self.factory.node.port,
- ilen, msg['req'], olen)
- elif msg['typ'] == 'rsp':
+ print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port,
+ ilen, msg[REQ], olen)
+ elif msg[TYP] == RSP:
# if response
# lookup tid
- if self.tids.has_key(msg['tid']):
- df = self.tids[msg['tid']]
+ if self.tids.has_key(msg[TID]):
+ df = self.tids[msg[TID]]
# callback
- del(self.tids[msg['tid']])
- df.callback({'rsp' : msg['rsp'], '_krpc_sender': self.transport.addr})
- # no tid, this transaction timed out already...
- elif msg['typ'] == 'err':
+ del(self.tids[msg[TID]])
+ df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
+ else:
+ print 'timeout ' + `msg[RSP]['id']`
+ # no tid, this transaction timed out already...
+ elif msg[TYP] == ERR:
# if error
# lookup tid
- df = self.tids[msg['tid']]
- # callback
- df.errback(msg['err'])
- del(self.tids[msg['tid']])
+ if self.tids.has_key(msg[TID]):
+ df = self.tids[msg[TID]]
+ # callback
+ df.errback(msg[ERR])
+ del(self.tids[msg[TID]])
+ else:
+ # day late and dollar short
+ pass
else:
+ print "unknown message type " + `msg`
# unknown message type
- df = self.tids[msg['tid']]
+ df = self.tids[msg[TID]]
# callback
df.errback(KRPC_ERROR_RECEIVED_UNKNOWN)
- del(self.tids[msg['tid']])
+ del(self.tids[msg[TID]])
def sendRequest(self, method, args):
# make message
# send it
- msg = {'tid' : hash.newID(), 'typ' : 'req', 'req' : method, 'arg' : args}
+ msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args}
+ self.mtid = (self.mtid + 1) % 256
str = bencode(msg)
d = Deferred()
- self.tids[msg['tid']] = d
- def timeOut(tids = self.tids, id = msg['tid']):
+ self.tids[msg[TID]] = d
+ def timeOut(tids = self.tids, id = msg[TID]):
if tids.has_key(id):
df = tids[id]
del(tids[id])
print ">>>>>> KRPC_ERROR_TIMEOUT"
df.errback(KRPC_ERROR_TIMEOUT)
reactor.callLater(KRPC_TIMEOUT, timeOut)
- self.sendString(str)
+ self.transport.write(str, self.addr)
return d