from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
from khash import newID, newIDInRange
from actions import FindNode, FindValue, GetValue, StoreValue
+from stats import StatsLogger
import krpc
class KhashmirBase(protocol.Factory):
self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
self.token_secrets = [newID()]
+ self.stats = StatsLogger(self.table, self.store, self.config)
# Start listening
- self.udp = krpc.hostbroker(self, config)
+ self.udp = krpc.hostbroker(self, self.stats, config)
self.udp.protocol = krpc.KRPC
self.listenport = reactor.listenUDP(self.port, self.udp)
id = newIDInRange(bucket.min, bucket.max)
self.findNode(id, callback)
- def stats(self):
- """Collect some statistics about the DHT.
-
- @rtype: (C{int}, C{int})
- @return: the number contacts in our routing table, and the estimated
- number of nodes in the entire DHT
- """
- num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
- num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
- return (num_contacts, num_nodes)
-
def shutdown(self):
"""Closes the port and cancels pending later calls."""
self.listenport.stopListening()
@type server: L{khashmir.Khashmir}
@ivar server: the main Khashmir program
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics logger to save transport info
@type config: C{dictionary}
@ivar config: the configuration parameters for the DHT
@type connections: C{dictionary}
@ivar addr: the IP address and port of this node
"""
- def __init__(self, server, config):
+ def __init__(self, server, stats, config):
"""Initialize the factory.
@type server: L{khashmir.Khashmir}
@param server: the main DHT program
+ @type stats: L{stats.StatsLogger}
+ @param stats: the statistics logger to save transport info
@type config: C{dictionary}
@param config: the configuration parameters for the DHT
"""
self.server = server
+ self.stats = stats
self.config = config
# this should be changed to storage that drops old entries
self.connections = {}
# Create a new protocol object if necessary
if not self.connections.has_key(addr):
- conn = self.protocol(addr, self.server, self.transport, self.config['SPEW'])
+ conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW'])
self.connections[addr] = conn
else:
conn = self.connections[addr]
@ivar transport: the transport to use for the protocol
@type factory: L{khashmir.Khashmir}
@ivar factory: the main Khashmir program
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics logger to save transport info
@type addr: (C{string}, C{int})
@ivar addr: the IP address and port of the source node
@type noisy: C{boolean}
@ivar stopped: whether the protocol has been stopped
"""
- def __init__(self, addr, server, transport, spew = False):
+ def __init__(self, addr, server, stats, transport, spew = False):
"""Initialize the protocol.
@type addr: (C{string}, C{int})
@param addr: the IP address and port of the source node
@type server: L{khashmir.Khashmir}
@param server: the main Khashmir program
+ @type stats: L{stats.StatsLogger}
+ @param stats: the statistics logger to save transport info
@param transport: the transport to use for the protocol
@type spew: C{boolean}
@param spew: whether to log additional details of the protocol
"""
self.transport = transport
self.factory = server
+ self.stats = stats
self.addr = addr
self.noisy = spew
self.tids = {}
@type addr: (C{string}, C{int})
@param addr: source IP address and port of datagram.
"""
+ self.stats.receivedBytes(len(data))
if self.stopped:
if self.noisy:
log.msg("stopped, dropping message from %r: %s" % (addr, data))
f = getattr(self.factory ,"krpc_" + msg[REQ], None)
msg[ARG]['_krpc_sender'] = self.addr
if f and callable(f):
+ self.stats.receivedAction(msg[REQ])
try:
ret = f(*(), **msg[ARG])
except KrpcError, e:
log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ])
log.err(e)
- olen = self._sendResponse(addr, msg[TID], ERR, [e[0], e[1]])
+ self.stats.errorAction(msg[REQ])
+ olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
+ [e[0], e[1]])
except TypeError, e:
log.msg('Got a malformed request for: krpc_%s' % msg[REQ])
log.err(e)
- olen = self._sendResponse(addr, msg[TID], ERR,
+ self.stats.errorAction(msg[REQ])
+ olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
[KRPC_ERROR_MALFORMED_REQUEST, str(e)])
except Exception, e:
log.msg('Got an unknown error while running: krpc_%s' % msg[REQ])
log.err(e)
- olen = self._sendResponse(addr, msg[TID], ERR,
+ self.stats.errorAction(msg[REQ])
+ olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
[KRPC_ERROR_SERVER_ERROR, str(e)])
else:
- olen = self._sendResponse(addr, msg[TID], RSP, ret)
+ olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret)
else:
# Request for unknown method
log.msg("ERROR: don't know about method %s" % msg[REQ])
- olen = self._sendResponse(addr, msg[TID], ERR,
+ self.stats.receivedAction('unknown')
+ olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR,
[KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])])
if self.noisy:
log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port,
df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN,
"Received an unknown message type: %r" % msg[TYP]))
- def _sendResponse(self, addr, tid, msgType, response):
+ def _sendResponse(self, request, addr, tid, msgType, response):
"""Helper function for sending responses to nodes.
-
+
+ @param request: the name of the requested method
@type addr: (C{string}, C{int})
@param addr: source IP address and port of datagram.
@param tid: the transaction ID of the request
else:
# Too long a response, send an error
log.msg('Could not send response, too long: %d bytes' % len(out))
+ self.stats.errorAction(request)
msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]}
out = bencode(msg)
except Exception, e:
# Unknown error, send an error message
+ self.stats.errorAction(request)
msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]}
out = bencode(msg)
+ self.stats.sentBytes(len(out))
self.transport.write(out, addr)
return len(out)
# Create the deferred and save it with the TID
d = Deferred()
self.tids[msg[TID]] = d
+
+ # Save the conclusion of the action
+ d.addCallbacks(self.stats.responseAction, self.stats.failedAction,
+ callbackArgs = (method, ), errbackArgs = (method, ))
# Schedule a later timeout call
def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr):
later_call.cancel()
return dict
d.addBoth(dropTimeOut)
+
+ # Save some stats
+ self.stats.sentAction(method)
+ self.stats.sentBytes(len(data))
self.transport.write(data, self.addr)
return d
def stop(self):
"""Timeout all pending requests."""
for df in self.tids.values():
- df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, 'connection has been stopped while waiting for response'))
+ df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED,
+ 'connection has been stopped while waiting for response'))
self.tids = {}
self.stopped = True