from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
from khash import newID, newIDInRange
from actions import FindNode, FindValue, GetValue, StoreValue
+from stats import StatsLogger
import krpc
class KhashmirBase(protocol.Factory):
@ivar table: the routing table
@type token_secrets: C{list} of C{string}
@ivar token_secrets: the current secrets to use to create tokens
+ @type stats: L{stats.StatsLogger}
+ @ivar stats: the statistics gatherer
@type udp: L{krpc.hostbroker}
@ivar udp: the factory for the KRPC protocol
@type listenport: L{twisted.internet.interfaces.IListeningPort}
self.node = self._loadSelfNode('', self.port)
self.table = KTable(self.node, config)
self.token_secrets = [newID()]
+ self.stats = StatsLogger(self.table, self.store, self.config)
# Start listening
- self.udp = krpc.hostbroker(self, config)
+ self.udp = krpc.hostbroker(self, self.stats, config)
self.udp.protocol = krpc.KRPC
self.listenport = reactor.listenUDP(self.port, self.udp)
d.callback(nodes)
else:
# Start the finding nodes action
- state = FindNode(self, id, d.callback, self.config)
+ state = FindNode(self, id, d.callback, self.config, self.stats)
reactor.callLater(0, state.goWithNodes, nodes)
def insertNode(self, node, contacted = True):
self.table.justSeenNode(old.id)
# Bucket is full, check to see if old node is still available
+ self.stats.startedAction('ping')
df = old.ping(self.node.id)
df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler)
elif callback:
callback(None)
+ self.stats.startedAction('join')
df = node.join(self.node.id)
df.addCallbacks(_pongHandler, _defaultPong)
id = newIDInRange(bucket.min, bucket.max)
self.findNode(id, callback)
- def stats(self):
- """Collect some statistics about the DHT.
-
- @rtype: (C{int}, C{int})
- @return: the number contacts in our routing table, and the estimated
- number of nodes in the entire DHT
- """
- num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0)
- num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1))
- return (num_contacts, num_nodes)
-
def shutdown(self):
"""Closes the port and cancels pending later calls."""
self.listenport.stopListening()
except:
pass
self.store.close()
+
+ def getStats(self):
+ """Gather the statistics for the DHT."""
+ return self.stats.gather()
#{ Remote interface
def krpc_ping(self, id, _krpc_sender):
d.addCallback(callback)
# Search for others starting with the locally found ones
- state = FindValue(self, key, d.callback, self.config)
+ state = FindValue(self, key, d.callback, self.config, self.stats)
reactor.callLater(0, state.goWithNodes, nodes)
def valueForKey(self, key, callback, searchlocal = True):
def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
"""Use the found nodes to send requests for values to."""
- state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config)
+ state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
reactor.callLater(0, state.goWithNodes, nodes)
# First lookup nodes that have values for the key
"""Default callback that does nothing."""
pass
response = _storedValueHandler
- action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
+ action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config, self.stats)
reactor.callLater(0, action.goWithNodes, nodes)
# First find the K closest nodes to operate on.