X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p_Khashmir%2Fkhashmir.py;h=5db5ed8ab48015c9c73073f9c8ceab8eac652701;hb=72a932f3ad66f0b391459848da54d5800d8b34bc;hp=8860c6719491e67c6267a1e483267587bb5dc3d2;hpb=7a84d9fb17076695aba3c0f5a32c6487bdd3f059;p=quix0rs-apt-p2p.git diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index 8860c67..5db5ed8 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -80,7 +80,7 @@ class KhashmirBase(protocol.Factory): self.node = self._loadSelfNode('', self.port) self.table = KTable(self.node, config) self.token_secrets = [newID()] - self.stats = StatsLogger(self.table, self.store, self.config) + self.stats = StatsLogger(self.table, self.store) # Start listening self.udp = krpc.hostbroker(self, self.stats, config) @@ -198,14 +198,15 @@ class KhashmirBase(protocol.Factory): (datetime.now() - old.lastSeen) > timedelta(seconds=self.config['MIN_PING_INTERVAL'])): - def _staleNodeHandler(err, oldnode = old, newnode = node, self = self): + def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()): """The pinged node never responded, so replace it.""" - log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port)) - log.err(err) + log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage())) + self.stats.completedAction('ping', start) self.table.replaceStaleNode(oldnode, newnode) - def _notStaleNodeHandler(dict, old=old, self=self): + def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()): """Got a pong from the old node, so update it.""" + self.stats.completedAction('ping', start) if dict['id'] == old.id: self.table.justSeenNode(old.id) @@ -228,17 +229,18 @@ class KhashmirBase(protocol.Factory): (optional, defaults to calling the callback with None) """ - def _pongHandler(dict, node=node, self=self, callback=callback): + def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()): """Node responded properly, callback with response.""" n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) - self.insertNode(n) + self.stats.completedAction('join', start) + reactor.callLater(0, self.insertNode, n) if callback: callback((dict['ip_addr'], dict['port'])) - def _defaultPong(err, node=node, self=self, callback=callback, errback=errback): + def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()): """Error occurred, fail node and errback or callback with error.""" - log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port)) - log.err(err) + log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage())) + self.stats.completedAction('join', start) self.table.nodeFailed(node) if errback: errback() @@ -304,7 +306,7 @@ class KhashmirBase(protocol.Factory): """ if _krpc_sender is not None: n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + reactor.callLater(0, self.insertNode, n, False) return {"id" : self.node.id} @@ -318,7 +320,7 @@ class KhashmirBase(protocol.Factory): """ if _krpc_sender is not None: n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + reactor.callLater(0, self.insertNode, n, False) else: _krpc_sender = ('127.0.0.1', self.port) @@ -336,7 +338,7 @@ class KhashmirBase(protocol.Factory): """ if _krpc_sender is not None: n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + reactor.callLater(0, self.insertNode, n, False) else: _krpc_sender = ('127.0.0.1', self.port) @@ -412,7 +414,7 @@ class KhashmirRead(KhashmirBase): """ if _krpc_sender is not None: n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + reactor.callLater(0, self.insertNode, n, False) nodes = self.table.findNodes(key) nodes = map(lambda node: node.contactInfo(), nodes) @@ -434,7 +436,7 @@ class KhashmirRead(KhashmirBase): """ if _krpc_sender is not None: n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + reactor.callLater(0, self.insertNode, n, False) l = self.store.retrieveValues(key) if num == 0 or num >= len(l): @@ -494,7 +496,7 @@ class KhashmirWrite(KhashmirRead): """ if _krpc_sender is not None: n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + reactor.callLater(0, self.insertNode, n, False) else: _krpc_sender = ('127.0.0.1', self.port) @@ -514,11 +516,12 @@ class Khashmir(KhashmirWrite): class SimpleTests(unittest.TestCase): timeout = 10 - DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, + DHT_DEFAULTS = {'PORT': 9977, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, + 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'KEY_EXPIRE': 3600, 'SPEW': False, } def setUp(self): @@ -587,11 +590,12 @@ class MultiTest(unittest.TestCase): timeout = 30 num = 20 - DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, + DHT_DEFAULTS = {'PORT': 9977, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, + 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'KEY_EXPIRE': 3600, 'SPEW': False, } def _done(self, val):