From: Cameron Dale Date: Wed, 12 Mar 2008 06:40:09 +0000 (-0700) Subject: Start using the new DHT stats module (untested). X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=commitdiff_plain;h=0ebb5e0abe0740bbd24675e4ac83c48f48c00b0c Start using the new DHT stats module (untested). --- diff --git a/apt_p2p_Khashmir/db.py b/apt_p2p_Khashmir/db.py index 47e974c..4ee9cb3 100644 --- a/apt_p2p_Khashmir/db.py +++ b/apt_p2p_Khashmir/db.py @@ -148,6 +148,19 @@ class DB: c.execute("DELETE FROM kv WHERE last_refresh < ?", (t, )) self.conn.commit() + def keyStats(self): + """Count the total number of keys and values in the database. + @rtype: (C{int), C{int}) + @return: the number of distinct keys and total values in the database + """ + c = self.conn.cursor() + c.execute("SELECT COUNT(DISTINCT key) as num_keys, COUNT(value) as num_values FROM kv") + keys, values = 0, 0 + row = c.fetchone() + if row: + keys, values = row[0], row[1] + return keys, values + class TestDB(unittest.TestCase): """Tests for the khashmir database.""" diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index 126a30e..9bd3140 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -20,6 +20,7 @@ from ktable import KTable from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID from khash import newID, newIDInRange from actions import FindNode, FindValue, GetValue, StoreValue +from stats import StatsLogger import krpc class KhashmirBase(protocol.Factory): @@ -75,9 +76,10 @@ class KhashmirBase(protocol.Factory): self.node = self._loadSelfNode('', self.port) self.table = KTable(self.node, config) self.token_secrets = [newID()] + self.stats = StatsLogger(self.table, self.store, self.config) # Start listening - self.udp = krpc.hostbroker(self, config) + self.udp = krpc.hostbroker(self, self.stats, config) self.udp.protocol = krpc.KRPC self.listenport = reactor.listenUDP(self.port, self.udp) @@ -284,17 +286,6 @@ class KhashmirBase(protocol.Factory): id = newIDInRange(bucket.min, bucket.max) self.findNode(id, callback) - def stats(self): - """Collect some statistics about the DHT. - - @rtype: (C{int}, C{int}) - @return: the number contacts in our routing table, and the estimated - number of nodes in the entire DHT - """ - num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0) - num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1)) - return (num_contacts, num_nodes) - def shutdown(self): """Closes the port and cancels pending later calls.""" self.listenport.stopListening() diff --git a/apt_p2p_Khashmir/krpc.py b/apt_p2p_Khashmir/krpc.py index a4fbacc..e577458 100644 --- a/apt_p2p_Khashmir/krpc.py +++ b/apt_p2p_Khashmir/krpc.py @@ -127,6 +127,8 @@ class hostbroker(protocol.DatagramProtocol): @type server: L{khashmir.Khashmir} @ivar server: the main Khashmir program + @type stats: L{stats.StatsLogger} + @ivar stats: the statistics logger to save transport info @type config: C{dictionary} @ivar config: the configuration parameters for the DHT @type connections: C{dictionary} @@ -139,15 +141,18 @@ class hostbroker(protocol.DatagramProtocol): @ivar addr: the IP address and port of this node """ - def __init__(self, server, config): + def __init__(self, server, stats, config): """Initialize the factory. @type server: L{khashmir.Khashmir} @param server: the main DHT program + @type stats: L{stats.StatsLogger} + @param stats: the statistics logger to save transport info @type config: C{dictionary} @param config: the configuration parameters for the DHT """ self.server = server + self.stats = stats self.config = config # this should be changed to storage that drops old entries self.connections = {} @@ -177,7 +182,7 @@ class hostbroker(protocol.DatagramProtocol): # Create a new protocol object if necessary if not self.connections.has_key(addr): - conn = self.protocol(addr, self.server, self.transport, self.config['SPEW']) + conn = self.protocol(addr, self.server, self.stats, self.transport, self.config['SPEW']) self.connections[addr] = conn else: conn = self.connections[addr] @@ -201,6 +206,8 @@ class KRPC: @ivar transport: the transport to use for the protocol @type factory: L{khashmir.Khashmir} @ivar factory: the main Khashmir program + @type stats: L{stats.StatsLogger} + @ivar stats: the statistics logger to save transport info @type addr: (C{string}, C{int}) @ivar addr: the IP address and port of the source node @type noisy: C{boolean} @@ -213,13 +220,15 @@ class KRPC: @ivar stopped: whether the protocol has been stopped """ - def __init__(self, addr, server, transport, spew = False): + def __init__(self, addr, server, stats, transport, spew = False): """Initialize the protocol. @type addr: (C{string}, C{int}) @param addr: the IP address and port of the source node @type server: L{khashmir.Khashmir} @param server: the main Khashmir program + @type stats: L{stats.StatsLogger} + @param stats: the statistics logger to save transport info @param transport: the transport to use for the protocol @type spew: C{boolean} @param spew: whether to log additional details of the protocol @@ -227,6 +236,7 @@ class KRPC: """ self.transport = transport self.factory = server + self.stats = stats self.addr = addr self.noisy = spew self.tids = {} @@ -240,6 +250,7 @@ class KRPC: @type addr: (C{string}, C{int}) @param addr: source IP address and port of datagram. """ + self.stats.receivedBytes(len(data)) if self.stopped: if self.noisy: log.msg("stopped, dropping message from %r: %s" % (addr, data)) @@ -272,28 +283,34 @@ class KRPC: f = getattr(self.factory ,"krpc_" + msg[REQ], None) msg[ARG]['_krpc_sender'] = self.addr if f and callable(f): + self.stats.receivedAction(msg[REQ]) try: ret = f(*(), **msg[ARG]) except KrpcError, e: log.msg('Got a Krpc error while running: krpc_%s' % msg[REQ]) log.err(e) - olen = self._sendResponse(addr, msg[TID], ERR, [e[0], e[1]]) + self.stats.errorAction(msg[REQ]) + olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR, + [e[0], e[1]]) except TypeError, e: log.msg('Got a malformed request for: krpc_%s' % msg[REQ]) log.err(e) - olen = self._sendResponse(addr, msg[TID], ERR, + self.stats.errorAction(msg[REQ]) + olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR, [KRPC_ERROR_MALFORMED_REQUEST, str(e)]) except Exception, e: log.msg('Got an unknown error while running: krpc_%s' % msg[REQ]) log.err(e) - olen = self._sendResponse(addr, msg[TID], ERR, + self.stats.errorAction(msg[REQ]) + olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR, [KRPC_ERROR_SERVER_ERROR, str(e)]) else: - olen = self._sendResponse(addr, msg[TID], RSP, ret) + olen = self._sendResponse(msg[REQ], addr, msg[TID], RSP, ret) else: # Request for unknown method log.msg("ERROR: don't know about method %s" % msg[REQ]) - olen = self._sendResponse(addr, msg[TID], ERR, + self.stats.receivedAction('unknown') + olen = self._sendResponse(msg[REQ], addr, msg[TID], ERR, [KRPC_ERROR_METHOD_UNKNOWN, "unknown method "+str(msg[REQ])]) if self.noisy: log.msg("%s >>> %s - %s %s %s" % (addr, self.factory.node.port, @@ -331,9 +348,10 @@ class KRPC: df.errback(KrpcError(KRPC_ERROR_RECEIVED_UNKNOWN, "Received an unknown message type: %r" % msg[TYP])) - def _sendResponse(self, addr, tid, msgType, response): + def _sendResponse(self, request, addr, tid, msgType, response): """Helper function for sending responses to nodes. - + + @param request: the name of the requested method @type addr: (C{string}, C{int}) @param addr: source IP address and port of datagram. @param tid: the transaction ID of the request @@ -388,14 +406,17 @@ class KRPC: else: # Too long a response, send an error log.msg('Could not send response, too long: %d bytes' % len(out)) + self.stats.errorAction(request) msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_RESPONSE_TOO_LONG, "response was %d bytes" % len(out)]} out = bencode(msg) except Exception, e: # Unknown error, send an error message + self.stats.errorAction(request) msg = {TID : tid, TYP : ERR, ERR : [KRPC_ERROR_SERVER_ERROR, "unknown error sending response: %s" % str(e)]} out = bencode(msg) + self.stats.sentBytes(len(out)) self.transport.write(out, addr) return len(out) @@ -418,6 +439,10 @@ class KRPC: # Create the deferred and save it with the TID d = Deferred() self.tids[msg[TID]] = d + + # Save the conclusion of the action + d.addCallbacks(self.stats.responseAction, self.stats.failedAction, + callbackArgs = (method, ), errbackArgs = (method, )) # Schedule a later timeout call def timeOut(tids = self.tids, id = msg[TID], method = method, addr = self.addr): @@ -435,6 +460,10 @@ class KRPC: later_call.cancel() return dict d.addBoth(dropTimeOut) + + # Save some stats + self.stats.sentAction(method) + self.stats.sentBytes(len(data)) self.transport.write(data, self.addr) return d @@ -442,7 +471,8 @@ class KRPC: def stop(self): """Timeout all pending requests.""" for df in self.tids.values(): - df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, 'connection has been stopped while waiting for response')) + df.errback(KrpcError(KRPC_ERROR_PROTOCOL_STOPPED, + 'connection has been stopped while waiting for response')) self.tids = {} self.stopped = True