From d8b63cce3887dfd61f3a8321d0c45327c4a1808b Mon Sep 17 00:00:00 2001 From: Cameron Dale Date: Sun, 24 Feb 2008 00:31:45 -0800 Subject: [PATCH] Rewrite of the actions to take advantage of the commonalities between them. --- TODO | 15 +- apt_dht_Khashmir/actions.py | 399 +++++++++++++++++------------------ apt_dht_Khashmir/khashmir.py | 10 +- apt_dht_Khashmir/knode.py | 8 +- 4 files changed, 205 insertions(+), 227 deletions(-) diff --git a/TODO b/TODO index 3abb53c..d871ee1 100644 --- a/TODO +++ b/TODO @@ -1,13 +1,8 @@ -Clean up the khashmir actions. - -The khashmir actions are a mess, and some cleanup is necessary. A lot -of the actions have most of their processing in common, so this code -should be put in functions that all can call. Perhaps creating a -base "RecurringAction" and "StaticAction" would be a good idea, -as then find_node and find_value could use the first, while get_value -and store_value could use the second. Perhaps ping and join actions -should also be created for consistency, and maybe inherit from a -"SingleNodeAction" base class. +Consider what happens when we are the closest node. + +In some of the actions it is unclear what happens when we are one of the +closest nodes to the target key. Do we store values that we publish +ourself? Packages.diff files need to be considered. diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index c94c78d..c80591a 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -9,20 +9,26 @@ from util import uncompact class ActionBase: """ base class for some long running asynchronous proccesses like finding nodes or values """ - def __init__(self, caller, target, callback, config): + def __init__(self, caller, target, callback, config, action, num_results = None): + """Initialize the action.""" self.caller = caller self.target = target self.config = config + self.action = action self.num = intify(target) - self.found = {} self.queried = {} self.answered = {} + self.found = {} + self.sorted_nodes = [] + self.results = {} + self.desired_results = num_results self.callback = callback self.outstanding = 0 + self.outstanding_results = 0 self.finished = 0 def sort(a, b, num=self.num): - """ this function is for sorting nodes relative to the ID we are looking for """ + """Sort nodes relative to the ID we are looking for.""" x, y = num ^ a.num, num ^ b.num if x > y: return 1 @@ -31,150 +37,188 @@ class ActionBase: return 0 self.sort = sort - def actionFailed(self, err, node): - log.msg("action %s failed (%s) %s/%s" % (self.__class__.__name__, self.config['PORT'], node.host, node.port)) - log.err(err) - self.caller.table.nodeFailed(node) - self.outstanding = self.outstanding - 1 - self.schedule() - - def goWithNodes(self, t): - pass - - -class FindNode(ActionBase): - """ find node action merits it's own class as it is a long running stateful process """ - def handleGotNodes(self, dict): - _krpc_sender = dict['_krpc_sender'] - dict = dict['rsp'] - n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1]) - self.caller.insertNode(n) - if dict["id"] in self.found: - self.found[dict["id"]].updateToken(dict.get('token', '')) - l = dict["nodes"] - if self.finished or self.answered.has_key(dict["id"]): - # a day late and a dollar short - return - self.outstanding = self.outstanding - 1 - self.answered[dict["id"]] = 1 - for compact_node in l: - node = uncompact(compact_node) - n = self.caller.Node(node) - if not self.found.has_key(n.id): - self.found[n.id] = n - self.schedule() - - def schedule(self): - """ - send messages to new peers, if necessary - """ - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - for node in l[:self.config['K']]: - if node.id == self.target: - self.finished=1 - return self.callback([node]) - if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: - #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.caller.node.id) - df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, )) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding >= self.config['CONCURRENT_REQS']: - break - assert self.outstanding >=0 - if self.outstanding == 0: - ## all done!! - self.finished=1 - reactor.callLater(0, self.callback, l[:self.config['K']]) - def goWithNodes(self, nodes): - """ - this starts the process, our argument is a transaction with t.extras being our list of nodes - it's a transaction since we got called from the dispatcher - """ + """Start the action's process with a list of nodes to contact.""" for node in nodes: if node.id == self.caller.node.id: continue else: self.found[node.id] = node - + self.sortNodes() self.schedule() + def schedule(self): + """Schedule requests to be sent to remote nodes.""" + # Check if we are already done + if self.desired_results and len(self.results) >= self.desired_results: + self.finished=1 + result = self.generateResult() + reactor.callLater(0, self.callback, *result) -class FindValue(ActionBase): - def handleGotNodes(self, dict): - _krpc_sender = dict['_krpc_sender'] - dict = dict['rsp'] - n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1]) - self.caller.insertNode(n) - if dict["id"] in self.found: - self.found[dict["id"]].updateNumValues(dict.get('num', 0)) - l = dict["nodes"] - if self.finished or self.answered.has_key(dict["id"]): - # a day late and a dollar short + if self.finished or (self.desired_results and + len(self.results) + self.outstanding_results >= self.desired_results): return - self.outstanding = self.outstanding - 1 - self.answered[dict["id"]] = 1 - for compact_node in l: - node = uncompact(compact_node) - n = self.caller.Node(node) - if not self.found.has_key(n.id): - self.found[n.id] = n - self.schedule() - def schedule(self): - """ - send messages to new peers, if necessary - """ - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - for node in l[:self.config['K']]: - if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: - df = node.findValue(self.target, self.caller.node.id) - df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, )) - self.outstanding = self.outstanding + 1 + for node in self.getNodesToProcess(): + if node.id not in self.queried and node.id != self.caller.node.id: self.queried[node.id] = 1 - if self.outstanding >= self.config['CONCURRENT_REQS']: + + # Get the action to call on the node + try: + f = getattr(node, self.action) + except AttributeError: + log.msg("%s doesn't have a %s method!" % (node, self.action)) + else: + # Get the arguments to the action's method + try: + args, expected_results = self.generateArgs(node) + except ValueError: + pass + else: + # Call the action on the remote node + self.outstanding += 1 + self.outstanding_results += expected_results + df = f(self.caller.node.id, *args) + df.addCallbacks(self.gotResponse, self.actionFailed, + callbackArgs = (node, expected_results), + errbackArgs = (node, expected_results)) + + # We might have to stop for now + if (self.outstanding >= self.config['CONCURRENT_REQS'] or + (self.desired_results and + self.outstanding_results >= self.desired_results)): break + + # If no requests are outstanding, then we are done assert self.outstanding >=0 if self.outstanding == 0: - ## all done!! - self.finished=1 - l = [node for node in self.found.values() if node.num_values > 0] - reactor.callLater(0, self.callback, l) + self.finished = 1 + result = self.generateResult() + reactor.callLater(0, self.callback, *result) + + def gotResponse(self, dict, node, expected_results): + """Receive a response from a remote node.""" + self.caller.insertNode(node) + if self.finished or self.answered.has_key(node.id): + # a day late and a dollar short + return + self.outstanding -= 1 + self.outstanding_results -= expected_results + self.answered[node.id] = 1 + self.processResponse(dict['rsp']) + self.schedule() + + def actionFailed(self, err, node, expected_results): + """Receive an error from a remote node.""" + log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port)) + log.err(err) + self.caller.table.nodeFailed(node) + self.answered[node.id] = 1 + self.outstanding -= 1 + self.outstanding_results -= expected_results + self.schedule() - def goWithNodes(self, nodes): + def handleGotNodes(self, nodes): + """Process any received node contact info in the response.""" + for compact_node in nodes: + node_contact = uncompact(compact_node) + node = self.caller.Node(node_contact) + if not self.found.has_key(node.id): + self.found[node.id] = node + + def sortNodes(self): + """Sort the nodes, if necessary. + + Assumes nodes are never removed from the L{found} dictionary. """ - this starts the process, our argument is a transaction with t.extras being our list of nodes - it's a transaction since we got called from the dispatcher + if len(self.sorted_nodes) != len(self.found): + self.sorted_nodes = self.found.values() + self.sorted_nodes.sort(self.sort) + + # The methods below are meant to be subclassed by actions + def getNodesToProcess(self): + """Generate a list of nodes to process next. + + This implementation is suitable for a recurring search over all nodes. """ - for node in nodes: - if node.id == self.caller.node.id: - continue - else: - self.found[node.id] = node + self.sortNodes() + return self.sorted_nodes[:self.config['K']] + + def generateArgs(self, node): + """Generate the arguments to the node's action. - self.schedule() + These arguments will be appended to our node ID when calling the action. + Also return the number of results expected from this request. + + @raise ValueError: if the node should not be queried + """ + return (self.target, ), 0 + + def processResponse(self, dict): + """Process the response dictionary received from the remote node.""" + pass + def generateResult(self, nodes): + """Create the result to return to the callback function.""" + return [] + -class GetValue(ActionBase): - def __init__(self, caller, target, num, callback, config, action="getValue"): - ActionBase.__init__(self, caller, target, callback, config) - self.num_values = num - self.outstanding_gets = 0 - self.action = action +class FindNode(ActionBase): + """Find the closest nodes to the key.""" + + def __init__(self, caller, target, callback, config, action="findNode"): + ActionBase.__init__(self, caller, target, callback, config, action) + + def processResponse(self, dict): + """Save the token received from each node.""" + if dict["id"] in self.found: + self.found[dict["id"]].updateToken(dict.get('token', '')) + self.handleGotNodes(dict['nodes']) + + def generateResult(self): + """Result is the K closest nodes to the target.""" + self.sortNodes() + return (self.sorted_nodes[:self.config['K']], ) + + +class FindValue(ActionBase): + """Find the closest nodes to the key and check their values.""" + + def __init__(self, caller, target, callback, config, action="findValue"): + ActionBase.__init__(self, caller, target, callback, config, action) + + def processResponse(self, dict): + """Save the number of values each node has.""" + if dict["id"] in self.found: + self.found[dict["id"]].updateNumValues(dict.get('num', 0)) + self.handleGotNodes(dict['nodes']) - def gotValues(self, dict, node): - dict = dict['rsp'] - self.outstanding -= 1 - self.caller.insertNode(node) - if self.finished: - return + def generateResult(self): + """Result is the nodes that have values, sorted by proximity to the key.""" + self.sortNodes() + return ([node for node in self.sorted_nodes if node.num_values > 0], ) + + +class GetValue(ActionBase): + def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"): + ActionBase.__init__(self, caller, target, callback, config, action, num_results) + if local_results: + for result in local_results: + self.results[result] = 1 + + def getNodesToProcess(self): + """Nodes are never added, always return the same thing.""" + return self.sorted_nodes + + def generateArgs(self, node): + """Args include the number of values to request.""" + if node.num_values > 0: + return (self.target, 0), node.num_values + else: + raise ValueError, "Don't try and get values from this node because it doesn't have any" + + def processResponse(self, dict): + """Save the returned values, calling the callback each time there are new ones.""" if dict.has_key('values'): def x(y, z=self.results): if not z.has_key(y): @@ -186,93 +230,32 @@ class GetValue(ActionBase): v = filter(None, map(x, dict['values'])) if len(v): reactor.callLater(0, self.callback, self.target, v) - if len(self.results) >= self.num_values: - self.finished=1 - reactor.callLater(0, self.callback, self.target, []) - else: - if not len(self.results) + self.outstanding_gets >= self.num_values: - self.schedule() - - def schedule(self): - if self.finished: - return - - for node in self.nodes: - if node.id not in self.queried and node.id != self.caller.node.id and node.num_values > 0: - try: - f = getattr(node, self.action) - except AttributeError: - log.msg("%s doesn't have a %s method!" % (node, self.action)) - else: - self.outstanding += 1 - self.outstanding_gets += node.num_values - df = f(self.target, 0, self.caller.node.id) - df.addCallbacks(self.gotValues, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, )) - self.queried[node.id] = 1 - if len(self.results) + self.outstanding_gets >= self.num_values or \ - self.outstanding >= self.config['CONCURRENT_REQS']: - break - assert self.outstanding >=0 - if self.outstanding == 0: - self.finished = 1 - reactor.callLater(0, self.callback, self.target, []) - - def goWithNodes(self, nodes, found = None): - self.results = {} - if found: - for n in found: - self.results[n] = 1 - self.nodes = nodes - self.nodes.sort(self.sort) - self.schedule() + def generateResult(self): + """Results have all been returned, now send the empty list to end it.""" + return (self.target, []) + class StoreValue(ActionBase): - def __init__(self, caller, target, value, callback, config, action="storeValue"): - ActionBase.__init__(self, caller, target, callback, config) + def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"): + ActionBase.__init__(self, caller, target, callback, config, action, num_results) self.value = value - self.stored = [] - self.action = action - def storedValue(self, t, node): - self.outstanding -= 1 - self.caller.insertNode(node) - if self.finished: - return - self.stored.append(t) - if len(self.stored) >= self.config['STORE_REDUNDANCY']: - self.finished=1 - self.callback(self.target, self.value, self.stored) + def getNodesToProcess(self): + """Nodes are never added, always return the same thing.""" + return self.sorted_nodes + + def generateArgs(self, node): + """Args include the value to request and the node's token.""" + if node.token: + return (self.target, self.value, node.token), 1 else: - if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']: - self.schedule() - return t + raise ValueError, "Don't store at this node since we don't know it's token" + + def processResponse(self, dict): + """Save the response, though it should be nothin but the ID.""" + self.results[dict["id"]] = dict - def schedule(self): - if self.finished: - return - num = self.config['CONCURRENT_REQS'] - self.outstanding - if num > self.config['STORE_REDUNDANCY']: - num = self.config['STORE_REDUNDANCY'] - for i in range(num): - try: - node = self.nodes.pop() - except IndexError: - if self.outstanding == 0: - self.finished = 1 - self.callback(self.target, self.value, self.stored) - else: - if not node.id == self.caller.node.id: - self.outstanding += 1 - try: - f = getattr(node, self.action) - except AttributeError: - log.msg("%s doesn't have a %s method!" % (node, self.action)) - else: - df = f(self.target, self.value, node.token, self.caller.node.id) - df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, )) - - def goWithNodes(self, nodes): - self.nodes = nodes - self.nodes.sort(self.sort) - self.schedule() + def generateResult(self): + """Return all the response IDs received.""" + return (self.target, self.value, self.results.values()) diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index 6162a6e..5895669 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -251,10 +251,10 @@ class KhashmirRead(KhashmirBase): else: l = [] - def _getValueForKey(nodes, key=key, local_values=l, response=callback, table=self.table, config=self.config): + def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self): # create our search state - state = GetValue(table, key, 50 - len(local_values), response, config) - reactor.callLater(0, state.goWithNodes, nodes, local_values) + state = GetValue(self, key, local_values, 50, response, self.config) + reactor.callLater(0, state.goWithNodes, nodes) # this call is asynch self.findValue(key, _getValueForKey) @@ -290,13 +290,13 @@ class KhashmirWrite(KhashmirRead): in this implementation, peers respond but don't indicate status to storing values a key can have many values """ - def _storeValueForKey(nodes, key=key, value=value, response=callback, table=self.table, config=self.config): + def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self): if not response: # default callback def _storedValueHandler(key, value, sender): pass response=_storedValueHandler - action = StoreValue(table, key, value, response, config) + action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config) reactor.callLater(0, action.goWithNodes, nodes) # this call is asynch diff --git a/apt_dht_Khashmir/knode.py b/apt_dht_Khashmir/knode.py index 795b8f0..ea714e6 100644 --- a/apt_dht_Khashmir/knode.py +++ b/apt_dht_Khashmir/knode.py @@ -35,27 +35,27 @@ class KNodeBase(Node): df.addCallback(self.checkSender) return df - def findNode(self, target, id): + def findNode(self, id, target): df = self.conn.sendRequest('find_node', {"target" : target, "id": id}) df.addErrback(self.errBack) df.addCallback(self.checkSender) return df class KNodeRead(KNodeBase): - def findValue(self, key, id): + def findValue(self, id, key): df = self.conn.sendRequest('find_value', {"key" : key, "id" : id}) df.addErrback(self.errBack) df.addCallback(self.checkSender) return df - def getValue(self, key, num, id): + def getValue(self, id, key, num): df = self.conn.sendRequest('get_value', {"key" : key, "num": num, "id" : id}) df.addErrback(self.errBack) df.addCallback(self.checkSender) return df class KNodeWrite(KNodeRead): - def storeValue(self, key, value, token, id): + def storeValue(self, id, key, value, token): df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "token" : token, "id": id}) df.addErrback(self.errBack) df.addCallback(self.checkSender) -- 2.39.5