X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p_Khashmir%2Fkhashmir.py;h=13bfb5a23352f91d603168945d54d614c351fe3f;hb=8115337306f4d45abaf92719f05adab4d011851a;hp=9509ce6bbff17e3a15fa4b4a3200ce9eb8bc7b7c;hpb=aefd29ced51580f57e487d48e71d7ce8f17bf081;p=quix0rs-apt-p2p.git diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index 9509ce6..13bfb5a 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -15,6 +15,7 @@ from copy import copy import os, re from twisted.internet.defer import Deferred +from twisted.internet.base import DelayedCall from twisted.internet import protocol, reactor from twisted.python import log from twisted.trial import unittest @@ -41,6 +42,9 @@ class KhashmirBase(protocol.Factory): @ivar _Node: the knode implementation to use for this class of DHT @type config: C{dictionary} @ivar config: the configuration parameters for the DHT + @type pinging: C{dictionary} + @ivar pinging: the node's that are currently being pinged, keys are the + node id's, values are the Deferred or DelayedCall objects @type port: C{int} @ivar port: the port to listen on @type store: L{db.DB} @@ -73,6 +77,7 @@ class KhashmirBase(protocol.Factory): (optional, defaults to the /tmp directory) """ self.config = None + self.pinging = {} self.setup(config, cache_dir) def setup(self, config, cache_dir): @@ -118,8 +123,8 @@ class KhashmirBase(protocol.Factory): def _loadSelfNode(self, host, port): """Create this node, loading any previously saved one.""" id = self.store.getSelfNode() - if not id: - id = newID() + if not id or not id.endswith(self.config['VERSION']): + id = newID(self.config['VERSION']) return self._Node(id, host, port) def checkpoint(self): @@ -180,6 +185,9 @@ class KhashmirBase(protocol.Factory): @param callback: the method to call with the results, it must take 1 parameter, the list of K closest nodes """ + # Mark the bucket as having been accessed + self.table.touch(id) + # Start with our node nodes = [copy(self.node)] @@ -214,47 +222,80 @@ class KhashmirBase(protocol.Factory): timedelta(seconds=self.config['MIN_PING_INTERVAL'])): # Bucket is full, check to see if old node is still available - self.stats.startedAction('ping') - df = old.ping(self.node.id) - df.addCallbacks(self._freshNodeHandler, self._staleNodeHandler, - callbackArgs = (old, datetime.now()), - errbackArgs = (old, datetime.now(), node, contacted)) + df = self.sendPing(old) + df.addErrback(self._staleNodeHandler, old, node, contacted) elif not old and not contacted: # There's room, we just need to contact the node first - self.stats.startedAction('ping') - df = node.ping(self.node.id) - # Convert the returned contact info into a node - df.addCallback(self._pongHandler, datetime.now()) - # Try adding the contacted node - df.addCallbacks(self.insertNode, self._pongError, - errbackArgs = (node, datetime.now())) - - def _freshNodeHandler(self, dict, old, start): - """Got a pong from the old node, so update it.""" - self.stats.completedAction('ping', start) - if dict['id'] == old.id: - self.table.justSeenNode(old.id) - - def _staleNodeHandler(self, err, old, start, node, contacted): + df = self.sendPing(node) + # Also schedule a future ping to make sure the node works + def rePing(newnode, self = self): + if newnode.id not in self.pinging: + self.pinging[newnode.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'], + self.sendPing, newnode) + return newnode + df.addCallback(rePing) + + def _staleNodeHandler(self, err, old, node, contacted): """The pinged node never responded, so replace it.""" - log.msg("action ping failed on %s/%s: %s" % (old.host, old.port, err.getErrorMessage())) - self.stats.completedAction('ping', start) self.table.invalidateNode(old) self.insertNode(node, contacted) + return err + + def nodeFailed(self, node): + """Mark a node as having failed a request and schedule a future check. + + @type node: L{node.Node} + @param node: the new node to try and insert + """ + exists = self.table.nodeFailed(node) + + # If in the table, schedule a ping, if one isn't already sent/scheduled + if exists and node.id not in self.pinging: + self.pinging[node.id] = reactor.callLater(self.config['MIN_PING_INTERVAL'], + self.sendPing, node) - def _pongHandler(self, dict, start): - """Node responded properly, change response into a node to insert.""" + def sendPing(self, node): + """Ping the node to see if it's still alive. + + @type node: L{node.Node} + @param node: the node to send the join to + """ + # Check for a ping already underway + if (isinstance(self.pinging.get(node.id, None), DelayedCall) and + self.pinging[node.id].active()): + self.pinging[node.id].cancel() + elif isinstance(self.pinging.get(node.id, None), Deferred): + return self.pinging[node.id] + + self.stats.startedAction('ping') + df = node.ping(self.node.id) + self.pinging[node.id] = df + df.addCallbacks(self._pingHandler, self._pingError, + callbackArgs = (node, datetime.now()), + errbackArgs = (node, datetime.now())) + return df + + def _pingHandler(self, dict, node, start): + """Node responded properly, update it and return the node object.""" self.stats.completedAction('ping', start) + del self.pinging[node.id] # Create the node using the returned contact info n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) + reactor.callLater(0, self.insertNode, n) return n - def _pongError(self, err, node, start): - """Error occurred, fail node and errback or callback with error.""" + def _pingError(self, err, node, start): + """Error occurred, fail node.""" log.msg("action ping failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage())) self.stats.completedAction('ping', start) - self.table.nodeFailed(node) - + + # Consume unhandled errors + self.pinging[node.id].addErrback(lambda ping_err: None) + del self.pinging[node.id] + + self.nodeFailed(node) + return err + def sendJoin(self, node, callback=None, errback=None): """Join the DHT by pinging a bootstrap node. @@ -290,7 +331,7 @@ class KhashmirBase(protocol.Factory): """Error occurred, fail node.""" log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage())) self.stats.completedAction('join', start) - self.table.nodeFailed(node) + self.nodeFailed(node) return err def findCloseNodes(self, callback=lambda a: None): @@ -331,6 +372,10 @@ class KhashmirBase(protocol.Factory): self.next_checkpoint.cancel() except: pass + for nodeid in self.pinging.keys(): + if isinstance(self.pinging[nodeid], DelayedCall) and self.pinging[nodeid].active(): + self.pinging[nodeid].cancel() + del self.pinging[nodeid] self.store.close() def getStats(self): @@ -405,6 +450,9 @@ class KhashmirRead(KhashmirBase): @param callback: the method to call with the results, it must take 1 parameter, the list of nodes with values """ + # Mark the bucket as having been accessed + self.table.touch(key) + # Start with ourself nodes = [copy(self.node)] @@ -558,12 +606,12 @@ class Khashmir(KhashmirWrite): class SimpleTests(unittest.TestCase): timeout = 10 - DHT_DEFAULTS = {'PORT': 9977, - 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, - 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, + DHT_DEFAULTS = {'VERSION': 'A000', 'PORT': 9977, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8, + 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000, 'MAX_FAILURES': 3, 'LOCAL_OK': True, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, - 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, + 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, 'KEY_EXPIRE': 3600, 'SPEW': True, } def setUp(self): @@ -582,21 +630,25 @@ class SimpleTests(unittest.TestCase): def testAddContact(self): self.failUnlessEqual(len(self.a.table.buckets), 1) - self.failUnlessEqual(len(self.a.table.buckets[0].l), 0) + self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 0) self.failUnlessEqual(len(self.b.table.buckets), 1) - self.failUnlessEqual(len(self.b.table.buckets[0].l), 0) + self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 0) self.a.addContact('127.0.0.1', 4045) reactor.iterate() reactor.iterate() reactor.iterate() reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() self.failUnlessEqual(len(self.a.table.buckets), 1) - self.failUnlessEqual(len(self.a.table.buckets[0].l), 1) + self.failUnlessEqual(len(self.a.table.buckets[0].nodes), 1) self.failUnlessEqual(len(self.b.table.buckets), 1) - self.failUnlessEqual(len(self.b.table.buckets[0].l), 1) + self.failUnlessEqual(len(self.b.table.buckets[0].nodes), 1) def testStoreRetrieve(self): self.a.addContact('127.0.0.1', 4045) @@ -620,6 +672,20 @@ class SimpleTests(unittest.TestCase): reactor.iterate() reactor.iterate() reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() def _cb(self, key, val): if not val: @@ -632,12 +698,12 @@ class MultiTest(unittest.TestCase): timeout = 30 num = 20 - DHT_DEFAULTS = {'PORT': 9977, - 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, - 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, + DHT_DEFAULTS = {'VERSION': 'A000', 'PORT': 9977, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 8, + 'STORE_REDUNDANCY': 6, 'RETRIEVE_VALUES': -10000, 'MAX_FAILURES': 3, 'LOCAL_OK': True, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, - 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, + 'KRPC_TIMEOUT': 9, 'KRPC_INITIAL_DELAY': 2, 'KEY_EXPIRE': 3600, 'SPEW': True, } def _done(self, val): @@ -660,6 +726,9 @@ class MultiTest(unittest.TestCase): reactor.iterate() reactor.iterate() reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() for i in self.l: self.done = 0