"""The KRPC communication protocol implementation.
-@var KRPC_INITIAL_DELAY: the number of seconds after which to try resending
- the request, the resends will wait twice as long each time
-@var KRPC_TIMEOUT: the number of seconds after which requests timeout
@var UDP_PACKET_LIMIT: the maximum number of bytes that can be sent in a
UDP packet without fragmentation
from khash import newID
-KRPC_INITIAL_DELAY = 2
-KRPC_TIMEOUT = 14
UDP_PACKET_LIMIT = 1472
# Remote node errors
# Create a new protocol object if necessary
if not self.connections.has_key(addr):
- conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW'])
+ conn = self.protocol(addr, self.server, self.stats, self.transport, self.config)
self.connections[addr] = conn
else:
conn = self.connections[addr]
@ivar method: the name of the method to call on the remote node
@type data: C{string}
@ivar data: the message to send to the remote node
+ @type config: C{dictionary}
+ @ivar config: the configuration parameters for the DHT
@type delay: C{int}
@ivar delay: the last timeout delay sent
@type start: C{datetime}
@ivar later: the pending call to timeout the last sent request
"""
- def __init__(self, protocol, newTID, method, data):
+ def __init__(self, protocol, newTID, method, data, config):
"""Initialize the request, and send it out.
@type protocol: L{KRPC}
@param method: the name of the method to call on the remote node
@type data: C{string}
@param data: the message to send to the remote node
+ @type config: C{dictionary}
+ @param config: the configuration parameters for the DHT
"""
defer.Deferred.__init__(self)
self.protocol = protocol
self.tid = newTID
self.method = method
self.data = data
- self.delay = KRPC_INITIAL_DELAY
+ self.config = config
+ self.delay = self.config.get('KRPC_INITIAL_DELAY', 2)
self.start = datetime.now()
self.later = None
self.send()
"""Check for a unrecoverable timeout, otherwise resend."""
self.later = None
delay = datetime.now() - self.start
- if delay > timedelta(seconds = KRPC_TIMEOUT):
+ if delay > timedelta(seconds = self.config.get('KRPC_TIMEOUT', 14)):
log.msg('%r timed out after %0.2f sec' %
(self.tid, delay.seconds + delay.microseconds/1000000.0))
self.protocol.timeOut(self.tid, self.method)
@ivar stats: the statistics logger to save transport info
@type addr: (C{string}, C{int})
@ivar addr: the IP address and port of the source node
- @type noisy: C{boolean}
- @ivar noisy: whether to log additional details of the protocol
+ @type config: C{dictionary}
+ @ivar config: the configuration parameters for the DHT
@type tids: C{dictionary}
@ivar tids: the transaction IDs outstanding for requests, keys are the
transaction ID of the request, values are the deferreds to call with
@ivar stopped: whether the protocol has been stopped
"""
- def __init__(self, addr, server, stats, transport, spew = False):
+ def __init__(self, addr, server, stats, transport, config = {}):
"""Initialize the protocol.
@type addr: (C{string}, C{int})
@type stats: L{stats.StatsLogger}
@param stats: the statistics logger to save transport info
@param transport: the transport to use for the protocol
- @type spew: C{boolean}
- @param spew: whether to log additional details of the protocol
- (optional, defaults to False)
+ @type config: C{dictionary}
+ @param config: the configuration parameters for the DHT
+ (optional, defaults to using defaults)
"""
self.transport = transport
self.factory = server
self.stats = stats
self.addr = addr
- self.noisy = spew
+ self.config = config
self.tids = {}
self.stopped = False
"""
self.stats.receivedBytes(len(data))
if self.stopped:
- if self.noisy:
+ if self.config.get('SPEW', False):
log.msg("stopped, dropping message from %r: %s" % (addr, data))
# Bdecode the message
try:
msg = bdecode(data)
except Exception, e:
- if self.noisy:
+ if self.config.get('SPEW', False):
log.msg("krpc bdecode error: ")
log.err(e)
return
log.err(e)
return
- if self.noisy:
+ if self.config.get('SPEW', False):
log.msg("%d received from %r: %s" % (self.factory.port, addr, msg))
# Process it based on its type
self.stats.receivedAction('unknown')
olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
[KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
- if self.noisy:
- log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
- ilen, msg[REQ], olen))
+
+ log.msg("%s >>> %s %s %s" % (addr, ilen, msg[REQ], olen))
elif msg[TYP] == RSP:
# Responses get processed by their TID's deferred
if self.tids.has_key(msg[TID]):
req.callback(msg[RSP])
else:
# no tid, this transaction was finished already...
- if self.noisy:
+ if self.config.get('SPEW', False):
log.msg('received response from %r for completed request: %r' %
(msg[RSP]['id'], msg[TID]))
elif msg[TYP] == ERR:
(msg[ERR], msg[RSP]['id'], msg[TID]))
else:
# Received an unknown message type
- if self.noisy:
+ if self.config.get('SPEW', False):
log.msg("unknown message type: %r" % msg)
if msg[TID] in self.tids:
req = self.tids[msg[TID]]
# Create the response message
msg = {TID : tid, TYP : msgType, msgType : response}
- if self.noisy:
+ if self.config.get('SPEW', False):
log.msg("%d responding to %r: %s" % (self.factory.port, addr, msg))
out = bencode(msg)
# Create the request message
newTID = newID()
msg = {TID : newTID, TYP : REQ, REQ : method, ARG : args}
- if self.noisy:
+ if self.config.get('SPEW', False):
log.msg("%d sending to %r: %s" % (self.factory.port, self.addr, msg))
data = bencode(msg)
# Create the request object and save it with the TID
- req = KrpcRequest(self, newTID, method, data)
+ req = KrpcRequest(self, newTID, method, data, self.config)
self.tids[newTID] = req
# Save the conclusion of the action
def make(port):
from stats import StatsLogger
af = Receiver()
- a = hostbroker(af, StatsLogger(None, None), {'SPEW': False})
+ a = hostbroker(af, StatsLogger(None, None),
+ {'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'SPEW': False})
a.protocol = KRPC
p = reactor.listenUDP(port, a)
return af, a, p