## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
# see LICENSE.txt for license information
-import airhook
-from twisted.internet.defer import Deferred
-from twisted.protocols import basic
from bencode import bencode, bdecode
-from twisted.internet import protocol
+from time import asctime
+import sys
+from traceback import format_exception
+from twisted.internet.defer import Deferred
+from twisted.internet import protocol
from twisted.internet import reactor
-import time
-
-import khash as hash
-KRPC_TIMEOUT = 60
+KRPC_TIMEOUT = 20
KRPC_ERROR = 1
KRPC_ERROR_METHOD_UNKNOWN = 2
KRPC_ERROR_TIMEOUT = 4
# commands
-TID = 'tid'
-REQ = 'req'
-RSP = 'rsp'
-TYP = 'typ'
-ARG = 'arg'
-ERR = 'err'
+TID = 't'
+REQ = 'q'
+RSP = 'r'
+TYP = 'y'
+ARG = 'a'
+ERR = 'e'
class hostbroker(protocol.DatagramProtocol):
def __init__(self, server):
- self.noisy = 0
self.server = server
# this should be changed to storage that drops old entries
self.connections = {}
#print `addr`, `datagram`
#if addr != self.addr:
c = self.connectionForAddr(addr)
- c.datagramReceived(datagram)
+ c.datagramReceived(datagram, addr)
#if c.idle():
# del self.connections[addr]
self.factory = server
self.addr = addr
self.tids = {}
+ self.mtid = 0
- def datagramReceived(self, str):
+ def datagramReceived(self, str, addr):
# bdecode
try:
msg = bdecode(str)
except Exception, e:
if self.noisy:
print "response decode error: " + `e`
- self.d.errback()
else:
#if self.noisy:
# print msg
# if request
# tell factory to handle
f = getattr(self.factory ,"krpc_" + msg[REQ], None)
+ msg[ARG]['_krpc_sender'] = self.addr
if f and callable(f):
- msg[ARG]['_krpc_sender'] = self.addr
try:
ret = apply(f, (), msg[ARG])
except Exception, e:
## send error
- out = bencode({TID:msg[TID], TYP:ERR, ERR :`e`})
+ out = bencode({TID:msg[TID], TYP:ERR, ERR :`format_exception(type(e), e, sys.exc_info()[2])`})
olen = len(out)
- self.transport.write(out, self.addr)
+ self.transport.write(out, addr)
else:
if ret:
# make response
out = bencode({TID : msg[TID], TYP : RSP, RSP : {}})
# send response
olen = len(out)
- self.transport.write(out, self.addr)
+ self.transport.write(out, addr)
else:
if self.noisy:
# unknown method
out = bencode({TID:msg[TID], TYP:ERR, ERR : KRPC_ERROR_METHOD_UNKNOWN})
olen = len(out)
- self.transport.write(out, self.addr)
+ self.transport.write(out, addr)
if self.noisy:
- print "%s %s >>> %s - %s %s %s" % (time.asctime(), self.addr, self.factory.node.port,
+ print "%s %s >>> %s - %s %s %s" % (asctime(), addr, self.factory.node.port,
ilen, msg[REQ], olen)
elif msg[TYP] == RSP:
# if response
df = self.tids[msg[TID]]
# callback
del(self.tids[msg[TID]])
- df.callback({RSP : msg[RSP], '_krpc_sender': self.addr})
+ df.callback({'rsp' : msg[RSP], '_krpc_sender': addr})
else:
- print 'timeout ' + `msg[RSP]['sender']`
+ print 'timeout ' + `msg[RSP]['id']`
# no tid, this transaction timed out already...
elif msg[TYP] == ERR:
# if error
def sendRequest(self, method, args):
# make message
# send it
- msg = {TID : hash.newTID(), TYP : REQ, REQ : method, ARG : args}
+ msg = {TID : chr(self.mtid), TYP : REQ, REQ : method, ARG : args}
+ self.mtid = (self.mtid + 1) % 256
str = bencode(msg)
d = Deferred()
self.tids[msg[TID]] = d