X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_dht_Khashmir%2Factions.py;h=c80591a4dcfbb87e1889734955be6c7ba33f7351;hp=8da4431dcd1d110e05c9cf429ea58fc4ab3bb435;hb=d8b63cce3887dfd61f3a8321d0c45327c4a1808b;hpb=b4ad0e73eced53fb46c41fe1511d03c4f2466dba diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index 8da4431..c80591a 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -2,25 +2,33 @@ # see LICENSE.txt for license information from twisted.internet import reactor +from twisted.python import log from khash import intify +from util import uncompact class ActionBase: """ base class for some long running asynchronous proccesses like finding nodes or values """ - def __init__(self, caller, target, callback, config): + def __init__(self, caller, target, callback, config, action, num_results = None): + """Initialize the action.""" self.caller = caller self.target = target self.config = config + self.action = action self.num = intify(target) - self.found = {} self.queried = {} self.answered = {} + self.found = {} + self.sorted_nodes = [] + self.results = {} + self.desired_results = num_results self.callback = callback self.outstanding = 0 + self.outstanding_results = 0 self.finished = 0 def sort(a, b, num=self.num): - """ this function is for sorting nodes relative to the ID we are looking for """ + """Sort nodes relative to the ID we are looking for.""" x, y = num ^ a.num, num ^ b.num if x > y: return 1 @@ -29,105 +37,189 @@ class ActionBase: return 0 self.sort = sort - def goWithNodes(self, t): - pass - + def goWithNodes(self, nodes): + """Start the action's process with a list of nodes to contact.""" + for node in nodes: + if node.id == self.caller.node.id: + continue + else: + self.found[node.id] = node + self.sortNodes() + self.schedule() + def schedule(self): + """Schedule requests to be sent to remote nodes.""" + # Check if we are already done + if self.desired_results and len(self.results) >= self.desired_results: + self.finished=1 + result = self.generateResult() + reactor.callLater(0, self.callback, *result) -FIND_NODE_TIMEOUT = 15 - -class FindNode(ActionBase): - """ find node action merits it's own class as it is a long running stateful process """ - def handleGotNodes(self, dict): - _krpc_sender = dict['_krpc_sender'] - dict = dict['rsp'] - n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1]) - self.caller.insertNode(n) - l = dict["nodes"] - if self.finished or self.answered.has_key(dict["id"]): - # a day late and a dollar short + if self.finished or (self.desired_results and + len(self.results) + self.outstanding_results >= self.desired_results): return - self.outstanding = self.outstanding - 1 - self.answered[dict["id"]] = 1 - for node in l: - n = self.caller.Node(node) - if not self.found.has_key(n.id): - self.found[n.id] = n - self.schedule() - def schedule(self): - """ - send messages to new peers, if necessary - """ - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - for node in l[:self.config['K']]: - if node.id == self.target: - self.finished=1 - return self.callback([node]) - if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: - #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.caller.node.id) - df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) - self.outstanding = self.outstanding + 1 + for node in self.getNodesToProcess(): + if node.id not in self.queried and node.id != self.caller.node.id: self.queried[node.id] = 1 - if self.outstanding >= self.config['CONCURRENT_REQS']: + + # Get the action to call on the node + try: + f = getattr(node, self.action) + except AttributeError: + log.msg("%s doesn't have a %s method!" % (node, self.action)) + else: + # Get the arguments to the action's method + try: + args, expected_results = self.generateArgs(node) + except ValueError: + pass + else: + # Call the action on the remote node + self.outstanding += 1 + self.outstanding_results += expected_results + df = f(self.caller.node.id, *args) + df.addCallbacks(self.gotResponse, self.actionFailed, + callbackArgs = (node, expected_results), + errbackArgs = (node, expected_results)) + + # We might have to stop for now + if (self.outstanding >= self.config['CONCURRENT_REQS'] or + (self.desired_results and + self.outstanding_results >= self.desired_results)): break + + # If no requests are outstanding, then we are done assert self.outstanding >=0 if self.outstanding == 0: - ## all done!! - self.finished=1 - reactor.callLater(0, self.callback, l[:self.config['K']]) - - def makeMsgFailed(self, node): - def defaultGotNodes(err, self=self, node=node): - print ">>> find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port), err - self.caller.table.nodeFailed(node) - self.outstanding = self.outstanding - 1 - self.schedule() - return defaultGotNodes + self.finished = 1 + result = self.generateResult() + reactor.callLater(0, self.callback, *result) + + def gotResponse(self, dict, node, expected_results): + """Receive a response from a remote node.""" + self.caller.insertNode(node) + if self.finished or self.answered.has_key(node.id): + # a day late and a dollar short + return + self.outstanding -= 1 + self.outstanding_results -= expected_results + self.answered[node.id] = 1 + self.processResponse(dict['rsp']) + self.schedule() + + def actionFailed(self, err, node, expected_results): + """Receive an error from a remote node.""" + log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port)) + log.err(err) + self.caller.table.nodeFailed(node) + self.answered[node.id] = 1 + self.outstanding -= 1 + self.outstanding_results -= expected_results + self.schedule() - def goWithNodes(self, nodes): + def handleGotNodes(self, nodes): + """Process any received node contact info in the response.""" + for compact_node in nodes: + node_contact = uncompact(compact_node) + node = self.caller.Node(node_contact) + if not self.found.has_key(node.id): + self.found[node.id] = node + + def sortNodes(self): + """Sort the nodes, if necessary. + + Assumes nodes are never removed from the L{found} dictionary. """ - this starts the process, our argument is a transaction with t.extras being our list of nodes - it's a transaction since we got called from the dispatcher + if len(self.sorted_nodes) != len(self.found): + self.sorted_nodes = self.found.values() + self.sorted_nodes.sort(self.sort) + + # The methods below are meant to be subclassed by actions + def getNodesToProcess(self): + """Generate a list of nodes to process next. + + This implementation is suitable for a recurring search over all nodes. """ - for node in nodes: - if node.id == self.caller.node.id: - continue - else: - self.found[node.id] = node + self.sortNodes() + return self.sorted_nodes[:self.config['K']] + + def generateArgs(self, node): + """Generate the arguments to the node's action. - self.schedule() + These arguments will be appended to our node ID when calling the action. + Also return the number of results expected from this request. + + @raise ValueError: if the node should not be queried + """ + return (self.target, ), 0 + def processResponse(self, dict): + """Process the response dictionary received from the remote node.""" + pass -get_value_timeout = 15 -class GetValue(FindNode): - def __init__(self, caller, target, callback, config, find="findValue"): - FindNode.__init__(self, caller, target, callback, config) - self.findValue = find - - """ get value task """ - def handleGotNodes(self, dict): - _krpc_sender = dict['_krpc_sender'] - dict = dict['rsp'] - n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1]) - self.caller.insertNode(n) - if self.finished or self.answered.has_key(dict["id"]): - # a day late and a dollar short - return - self.outstanding = self.outstanding - 1 - self.answered[dict["id"]] = 1 - # go through nodes - # if we have any closer than what we already got, query them - if dict.has_key('nodes'): - for node in dict['nodes']: - n = self.caller.Node(node) - if not self.found.has_key(n.id): - self.found[n.id] = n - elif dict.has_key('values'): + def generateResult(self, nodes): + """Create the result to return to the callback function.""" + return [] + + +class FindNode(ActionBase): + """Find the closest nodes to the key.""" + + def __init__(self, caller, target, callback, config, action="findNode"): + ActionBase.__init__(self, caller, target, callback, config, action) + + def processResponse(self, dict): + """Save the token received from each node.""" + if dict["id"] in self.found: + self.found[dict["id"]].updateToken(dict.get('token', '')) + self.handleGotNodes(dict['nodes']) + + def generateResult(self): + """Result is the K closest nodes to the target.""" + self.sortNodes() + return (self.sorted_nodes[:self.config['K']], ) + + +class FindValue(ActionBase): + """Find the closest nodes to the key and check their values.""" + + def __init__(self, caller, target, callback, config, action="findValue"): + ActionBase.__init__(self, caller, target, callback, config, action) + + def processResponse(self, dict): + """Save the number of values each node has.""" + if dict["id"] in self.found: + self.found[dict["id"]].updateNumValues(dict.get('num', 0)) + self.handleGotNodes(dict['nodes']) + + def generateResult(self): + """Result is the nodes that have values, sorted by proximity to the key.""" + self.sortNodes() + return ([node for node in self.sorted_nodes if node.num_values > 0], ) + + +class GetValue(ActionBase): + def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"): + ActionBase.__init__(self, caller, target, callback, config, action, num_results) + if local_results: + for result in local_results: + self.results[result] = 1 + + def getNodesToProcess(self): + """Nodes are never added, always return the same thing.""" + return self.sorted_nodes + + def generateArgs(self, node): + """Args include the number of values to request.""" + if node.num_values > 0: + return (self.target, 0), node.num_values + else: + raise ValueError, "Don't try and get values from this node because it doesn't have any" + + def processResponse(self, dict): + """Save the returned values, calling the callback each time there are new ones.""" + if dict.has_key('values'): def x(y, z=self.results): if not z.has_key(y): z[y] = 1 @@ -136,127 +228,34 @@ class GetValue(FindNode): return None z = len(dict['values']) v = filter(None, map(x, dict['values'])) - if(len(v)): + if len(v): reactor.callLater(0, self.callback, self.target, v) - self.schedule() - - ## get value - def schedule(self): - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - - for node in l[:self.config['K']]: - if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: - #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT - try: - f = getattr(node, self.findValue) - except AttributeError: - print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue) - else: - df = f(self.target, self.caller.node.id) - df.addCallback(self.handleGotNodes) - df.addErrback(self.makeMsgFailed(node)) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding >= self.config['CONCURRENT_REQS']: - break - assert self.outstanding >=0 - if self.outstanding == 0: - ## all done, didn't find it!! - self.finished=1 - reactor.callLater(0, self.callback, self.target, []) - - ## get value - def goWithNodes(self, nodes, found=None): - self.results = {} - if found: - for n in found: - self.results[n] = 1 - for node in nodes: - if node.id == self.caller.node.id: - continue - else: - self.found[node.id] = node - - self.schedule() + def generateResult(self): + """Results have all been returned, now send the empty list to end it.""" + return (self.target, []) + class StoreValue(ActionBase): - def __init__(self, caller, target, value, originated, callback, config, store="storeValue"): - ActionBase.__init__(self, caller, target, callback, config) + def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"): + ActionBase.__init__(self, caller, target, callback, config, action, num_results) self.value = value - self.originated = originated - self.stored = [] - self.store = store - def storedValue(self, t, node): - self.outstanding -= 1 - self.caller.insertNode(node) - if self.finished: - return - self.stored.append(t) - if len(self.stored) >= self.config['STORE_REDUNDANCY']: - self.finished=1 - self.callback(self.target, self.value, self.stored) - else: - if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']: - self.schedule() - return t - - def storeFailed(self, t, node): - print ">>> store failed %s/%s" % (node.host, node.port) - self.caller.nodeFailed(node) - self.outstanding -= 1 - if self.finished: - return t - self.schedule() - return t - - def schedule(self): - if self.finished: - return - num = self.config['CONCURRENT_REQS'] - self.outstanding - if num > self.config['STORE_REDUNDANCY']: - num = self.config['STORE_REDUNDANCY'] - for i in range(num): - try: - node = self.nodes.pop() - except IndexError: - if self.outstanding == 0: - self.finished = 1 - self.callback(self.target, self.value, self.stored) - else: - if not node.id == self.caller.node.id: - self.outstanding += 1 - try: - f = getattr(node, self.store) - except AttributeError: - print ">>> %s doesn't have a %s method!" % (node, self.store) - else: - df = f(self.target, self.value, self.originated, self.caller.node.id) - df.addCallback(self.storedValue, node=node) - df.addErrback(self.storeFailed, node=node) - - def goWithNodes(self, nodes): - self.nodes = nodes - self.nodes.sort(self.sort) - self.schedule() + def getNodesToProcess(self): + """Nodes are never added, always return the same thing.""" + return self.sorted_nodes + def generateArgs(self, node): + """Args include the value to request and the node's token.""" + if node.token: + return (self.target, self.value, node.token), 1 + else: + raise ValueError, "Don't store at this node since we don't know it's token" -class KeyExpirer: - def __init__(self, store, config): - self.store = store - self.config = config - self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire) + def processResponse(self, dict): + """Save the response, though it should be nothin but the ID.""" + self.results[dict["id"]] = dict - def doExpire(self): - self.store.expireValues(self.config['KE_AGE']) - self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire) - - def shutdown(self): - try: - self.next_expire.cancel() - except: - pass + def generateResult(self): + """Return all the response IDs received.""" + return (self.target, self.value, self.results.values())