Rewrite of the actions to take advantage of the commonalities between them.
authorCameron Dale <camrdale@gmail.com>
Sun, 24 Feb 2008 08:31:45 +0000 (00:31 -0800)
committerCameron Dale <camrdale@gmail.com>
Sun, 24 Feb 2008 08:31:45 +0000 (00:31 -0800)
TODO
apt_dht_Khashmir/actions.py
apt_dht_Khashmir/khashmir.py
apt_dht_Khashmir/knode.py

diff --git a/TODO b/TODO
index 3abb53c..d871ee1 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,13 +1,8 @@
-Clean up the khashmir actions.
-
-The khashmir actions are a mess, and some cleanup is necessary. A lot
-of the actions have most of their processing in common, so this code
-should be put in functions that all can call. Perhaps creating a
-base "RecurringAction" and "StaticAction" would be a good idea,
-as then find_node and find_value could use the first, while get_value
-and store_value could use the second. Perhaps ping and join actions
-should also be created for consistency, and maybe inherit from a
-"SingleNodeAction" base class.
+Consider what happens when we are the closest node.
+
+In some of the actions it is unclear what happens when we are one of the
+closest nodes to the target key. Do we store values that we publish
+ourself?
 
 
 Packages.diff files need to be considered.
index c94c78d..c80591a 100644 (file)
@@ -9,20 +9,26 @@ from util import uncompact
 
 class ActionBase:
     """ base class for some long running asynchronous proccesses like finding nodes or values """
-    def __init__(self, caller, target, callback, config):
+    def __init__(self, caller, target, callback, config, action, num_results = None):
+        """Initialize the action."""
         self.caller = caller
         self.target = target
         self.config = config
+        self.action = action
         self.num = intify(target)
-        self.found = {}
         self.queried = {}
         self.answered = {}
+        self.found = {}
+        self.sorted_nodes = []
+        self.results = {}
+        self.desired_results = num_results
         self.callback = callback
         self.outstanding = 0
+        self.outstanding_results = 0
         self.finished = 0
     
         def sort(a, b, num=self.num):
-            """ this function is for sorting nodes relative to the ID we are looking for """
+            """Sort nodes relative to the ID we are looking for."""
             x, y = num ^ a.num, num ^ b.num
             if x > y:
                 return 1
@@ -31,150 +37,188 @@ class ActionBase:
             return 0
         self.sort = sort
         
-    def actionFailed(self, err, node):
-        log.msg("action %s failed (%s) %s/%s" % (self.__class__.__name__, self.config['PORT'], node.host, node.port))
-        log.err(err)
-        self.caller.table.nodeFailed(node)
-        self.outstanding = self.outstanding - 1
-        self.schedule()
-    
-    def goWithNodes(self, t):
-        pass
-    
-
-class FindNode(ActionBase):
-    """ find node action merits it's own class as it is a long running stateful process """
-    def handleGotNodes(self, dict):
-        _krpc_sender = dict['_krpc_sender']
-        dict = dict['rsp']
-        n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
-        self.caller.insertNode(n)
-        if dict["id"] in self.found:
-            self.found[dict["id"]].updateToken(dict.get('token', ''))
-        l = dict["nodes"]
-        if self.finished or self.answered.has_key(dict["id"]):
-            # a day late and a dollar short
-            return
-        self.outstanding = self.outstanding - 1
-        self.answered[dict["id"]] = 1
-        for compact_node in l:
-            node = uncompact(compact_node)
-            n = self.caller.Node(node)
-            if not self.found.has_key(n.id):
-                self.found[n.id] = n
-        self.schedule()
-        
-    def schedule(self):
-        """
-            send messages to new peers, if necessary
-        """
-        if self.finished:
-            return
-        l = self.found.values()
-        l.sort(self.sort)
-        for node in l[:self.config['K']]:
-            if node.id == self.target:
-                self.finished=1
-                return self.callback([node])
-            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
-                #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-                df = node.findNode(self.target, self.caller.node.id)
-                df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
-                self.outstanding = self.outstanding + 1
-                self.queried[node.id] = 1
-            if self.outstanding >= self.config['CONCURRENT_REQS']:
-                break
-        assert self.outstanding >=0
-        if self.outstanding == 0:
-            ## all done!!
-            self.finished=1
-            reactor.callLater(0, self.callback, l[:self.config['K']])
-    
     def goWithNodes(self, nodes):
-        """
-            this starts the process, our argument is a transaction with t.extras being our list of nodes
-            it's a transaction since we got called from the dispatcher
-        """
+        """Start the action's process with a list of nodes to contact."""
         for node in nodes:
             if node.id == self.caller.node.id:
                 continue
             else:
                 self.found[node.id] = node
-        
+        self.sortNodes()
         self.schedule()
     
+    def schedule(self):
+        """Schedule requests to be sent to remote nodes."""
+        # Check if we are already done
+        if self.desired_results and len(self.results) >= self.desired_results:
+            self.finished=1
+            result = self.generateResult()
+            reactor.callLater(0, self.callback, *result)
 
-class FindValue(ActionBase):
-    def handleGotNodes(self, dict):
-        _krpc_sender = dict['_krpc_sender']
-        dict = dict['rsp']
-        n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
-        self.caller.insertNode(n)
-        if dict["id"] in self.found:
-            self.found[dict["id"]].updateNumValues(dict.get('num', 0))
-        l = dict["nodes"]
-        if self.finished or self.answered.has_key(dict["id"]):
-            # a day late and a dollar short
+        if self.finished or (self.desired_results and 
+                             len(self.results) + self.outstanding_results >= self.desired_results):
             return
-        self.outstanding = self.outstanding - 1
-        self.answered[dict["id"]] = 1
-        for compact_node in l:
-            node = uncompact(compact_node)
-            n = self.caller.Node(node)
-            if not self.found.has_key(n.id):
-                self.found[n.id] = n
-        self.schedule()
         
-    def schedule(self):
-        """
-            send messages to new peers, if necessary
-        """
-        if self.finished:
-            return
-        l = self.found.values()
-        l.sort(self.sort)
-        for node in l[:self.config['K']]:
-            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
-                df = node.findValue(self.target, self.caller.node.id)
-                df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
-                self.outstanding = self.outstanding + 1
+        for node in self.getNodesToProcess():
+            if node.id not in self.queried and node.id != self.caller.node.id:
                 self.queried[node.id] = 1
-            if self.outstanding >= self.config['CONCURRENT_REQS']:
+                
+                # Get the action to call on the node
+                try:
+                    f = getattr(node, self.action)
+                except AttributeError:
+                    log.msg("%s doesn't have a %s method!" % (node, self.action))
+                else:
+                    # Get the arguments to the action's method
+                    try:
+                        args, expected_results = self.generateArgs(node)
+                    except ValueError:
+                        pass
+                    else:
+                        # Call the action on the remote node
+                        self.outstanding += 1
+                        self.outstanding_results += expected_results
+                        df = f(self.caller.node.id, *args)
+                        df.addCallbacks(self.gotResponse, self.actionFailed,
+                                        callbackArgs = (node, expected_results),
+                                        errbackArgs = (node, expected_results))
+                        
+            # We might have to stop for now
+            if (self.outstanding >= self.config['CONCURRENT_REQS'] or
+                (self.desired_results and
+                 self.outstanding_results >= self.desired_results)):
                 break
+            
+        # If no requests are outstanding, then we are done
         assert self.outstanding >=0
         if self.outstanding == 0:
-            ## all done!!
-            self.finished=1
-            l = [node for node in self.found.values() if node.num_values > 0]
-            reactor.callLater(0, self.callback, l)
+            self.finished = 1
+            result = self.generateResult()
+            reactor.callLater(0, self.callback, *result)
+
+    def gotResponse(self, dict, node, expected_results):
+        """Receive a response from a remote node."""
+        self.caller.insertNode(node)
+        if self.finished or self.answered.has_key(node.id):
+            # a day late and a dollar short
+            return
+        self.outstanding -= 1
+        self.outstanding_results -= expected_results
+        self.answered[node.id] = 1
+        self.processResponse(dict['rsp'])
+        self.schedule()
+
+    def actionFailed(self, err, node, expected_results):
+        """Receive an error from a remote node."""
+        log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
+        log.err(err)
+        self.caller.table.nodeFailed(node)
+        self.answered[node.id] = 1
+        self.outstanding -= 1
+        self.outstanding_results -= expected_results
+        self.schedule()
     
-    def goWithNodes(self, nodes):
+    def handleGotNodes(self, nodes):
+        """Process any received node contact info in the response."""
+        for compact_node in nodes:
+            node_contact = uncompact(compact_node)
+            node = self.caller.Node(node_contact)
+            if not self.found.has_key(node.id):
+                self.found[node.id] = node
+
+    def sortNodes(self):
+        """Sort the nodes, if necessary.
+        
+        Assumes nodes are never removed from the L{found} dictionary.
         """
-            this starts the process, our argument is a transaction with t.extras being our list of nodes
-            it's a transaction since we got called from the dispatcher
+        if len(self.sorted_nodes) != len(self.found):
+            self.sorted_nodes = self.found.values()
+            self.sorted_nodes.sort(self.sort)
+                
+    # The methods below are meant to be subclassed by actions
+    def getNodesToProcess(self):
+        """Generate a list of nodes to process next.
+        
+        This implementation is suitable for a recurring search over all nodes.
         """
-        for node in nodes:
-            if node.id == self.caller.node.id:
-                continue
-            else:
-                self.found[node.id] = node
+        self.sortNodes()
+        return self.sorted_nodes[:self.config['K']]
+    
+    def generateArgs(self, node):
+        """Generate the arguments to the node's action.
         
-        self.schedule()
+        These arguments will be appended to our node ID when calling the action.
+        Also return the number of results expected from this request.
+        
+        @raise ValueError: if the node should not be queried
+        """
+        return (self.target, ), 0
+    
+    def processResponse(self, dict):
+        """Process the response dictionary received from the remote node."""
+        pass
 
+    def generateResult(self, nodes):
+        """Create the result to return to the callback function."""
+        return []
+        
 
-class GetValue(ActionBase):
-    def __init__(self, caller, target, num, callback, config, action="getValue"):
-        ActionBase.__init__(self, caller, target, callback, config)
-        self.num_values = num
-        self.outstanding_gets = 0
-        self.action = action
+class FindNode(ActionBase):
+    """Find the closest nodes to the key."""
+
+    def __init__(self, caller, target, callback, config, action="findNode"):
+        ActionBase.__init__(self, caller, target, callback, config, action)
+
+    def processResponse(self, dict):
+        """Save the token received from each node."""
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateToken(dict.get('token', ''))
+        self.handleGotNodes(dict['nodes'])
+
+    def generateResult(self):
+        """Result is the K closest nodes to the target."""
+        self.sortNodes()
+        return (self.sorted_nodes[:self.config['K']], )
+    
+
+class FindValue(ActionBase):
+    """Find the closest nodes to the key and check their values."""
+
+    def __init__(self, caller, target, callback, config, action="findValue"):
+        ActionBase.__init__(self, caller, target, callback, config, action)
+
+    def processResponse(self, dict):
+        """Save the number of values each node has."""
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateNumValues(dict.get('num', 0))
+        self.handleGotNodes(dict['nodes'])
         
-    def gotValues(self, dict, node):
-        dict = dict['rsp']
-        self.outstanding -= 1
-        self.caller.insertNode(node)
-        if self.finished:
-            return
+    def generateResult(self):
+        """Result is the nodes that have values, sorted by proximity to the key."""
+        self.sortNodes()
+        return ([node for node in self.sorted_nodes if node.num_values > 0], )
+    
+
+class GetValue(ActionBase):
+    def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
+        ActionBase.__init__(self, caller, target, callback, config, action, num_results)
+        if local_results:
+            for result in local_results:
+                self.results[result] = 1
+
+    def getNodesToProcess(self):
+        """Nodes are never added, always return the same thing."""
+        return self.sorted_nodes
+    
+    def generateArgs(self, node):
+        """Args include the number of values to request."""
+        if node.num_values > 0:
+            return (self.target, 0), node.num_values
+        else:
+            raise ValueError, "Don't try and get values from this node because it doesn't have any"
+
+    def processResponse(self, dict):
+        """Save the returned values, calling the callback each time there are new ones."""
         if dict.has_key('values'):
             def x(y, z=self.results):
                 if not z.has_key(y):
@@ -186,93 +230,32 @@ class GetValue(ActionBase):
             v = filter(None, map(x, dict['values']))
             if len(v):
                 reactor.callLater(0, self.callback, self.target, v)
-        if len(self.results) >= self.num_values:
-            self.finished=1
-            reactor.callLater(0, self.callback, self.target, [])
-        else:
-            if not len(self.results) + self.outstanding_gets >= self.num_values:
-                self.schedule()
-    
-    def schedule(self):
-        if self.finished:
-            return
-        
-        for node in self.nodes:
-            if node.id not in self.queried and node.id != self.caller.node.id and node.num_values > 0:
-                try:
-                    f = getattr(node, self.action)
-                except AttributeError:
-                    log.msg("%s doesn't have a %s method!" % (node, self.action))
-                else:
-                    self.outstanding += 1
-                    self.outstanding_gets += node.num_values
-                    df = f(self.target, 0, self.caller.node.id)
-                    df.addCallbacks(self.gotValues, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
-                    self.queried[node.id] = 1
-            if len(self.results) + self.outstanding_gets >= self.num_values or \
-                self.outstanding >= self.config['CONCURRENT_REQS']:
-                break
-        assert self.outstanding >=0
-        if self.outstanding == 0:
-            self.finished = 1
-            reactor.callLater(0, self.callback, self.target, [])
-                    
-    def goWithNodes(self, nodes, found = None):
-        self.results = {}
-        if found:
-            for n in found:
-                self.results[n] = 1
-        self.nodes = nodes
-        self.nodes.sort(self.sort)
-        self.schedule()
 
+    def generateResult(self):
+        """Results have all been returned, now send the empty list to end it."""
+        return (self.target, [])
+        
 
 class StoreValue(ActionBase):
-    def __init__(self, caller, target, value, callback, config, action="storeValue"):
-        ActionBase.__init__(self, caller, target, callback, config)
+    def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
+        ActionBase.__init__(self, caller, target, callback, config, action, num_results)
         self.value = value
-        self.stored = []
-        self.action = action
         
-    def storedValue(self, t, node):
-        self.outstanding -= 1
-        self.caller.insertNode(node)
-        if self.finished:
-            return
-        self.stored.append(t)
-        if len(self.stored) >= self.config['STORE_REDUNDANCY']:
-            self.finished=1
-            self.callback(self.target, self.value, self.stored)
+    def getNodesToProcess(self):
+        """Nodes are never added, always return the same thing."""
+        return self.sorted_nodes
+
+    def generateArgs(self, node):
+        """Args include the value to request and the node's token."""
+        if node.token:
+            return (self.target, self.value, node.token), 1
         else:
-            if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
-                self.schedule()
-        return t
+            raise ValueError, "Don't store at this node since we don't know it's token"
+
+    def processResponse(self, dict):
+        """Save the response, though it should be nothin but the ID."""
+        self.results[dict["id"]] = dict
     
-    def schedule(self):
-        if self.finished:
-            return
-        num = self.config['CONCURRENT_REQS'] - self.outstanding
-        if num > self.config['STORE_REDUNDANCY']:
-            num = self.config['STORE_REDUNDANCY']
-        for i in range(num):
-            try:
-                node = self.nodes.pop()
-            except IndexError:
-                if self.outstanding == 0:
-                    self.finished = 1
-                    self.callback(self.target, self.value, self.stored)
-            else:
-                if not node.id == self.caller.node.id:
-                    self.outstanding += 1
-                    try:
-                        f = getattr(node, self.action)
-                    except AttributeError:
-                        log.msg("%s doesn't have a %s method!" % (node, self.action))
-                    else:
-                        df = f(self.target, self.value, node.token, self.caller.node.id)
-                        df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
-                    
-    def goWithNodes(self, nodes):
-        self.nodes = nodes
-        self.nodes.sort(self.sort)
-        self.schedule()
+    def generateResult(self):
+        """Return all the response IDs received."""
+        return (self.target, self.value, self.results.values())
index 6162a6e..5895669 100644 (file)
@@ -251,10 +251,10 @@ class KhashmirRead(KhashmirBase):
         else:
             l = []
 
-        def _getValueForKey(nodes, key=key, local_values=l, response=callback, table=self.table, config=self.config):
+        def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
             # create our search state
-            state = GetValue(table, key, 50 - len(local_values), response, config)
-            reactor.callLater(0, state.goWithNodes, nodes, local_values)
+            state = GetValue(self, key, local_values, 50, response, self.config)
+            reactor.callLater(0, state.goWithNodes, nodes)
             
         # this call is asynch
         self.findValue(key, _getValueForKey)
@@ -290,13 +290,13 @@ class KhashmirWrite(KhashmirRead):
             in this implementation, peers respond but don't indicate status to storing values
             a key can have many values
         """
-        def _storeValueForKey(nodes, key=key, value=value, response=callback, table=self.table, config=self.config):
+        def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self):
             if not response:
                 # default callback
                 def _storedValueHandler(key, value, sender):
                     pass
                 response=_storedValueHandler
-            action = StoreValue(table, key, value, response, config)
+            action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config)
             reactor.callLater(0, action.goWithNodes, nodes)
             
         # this call is asynch
index 795b8f0..ea714e6 100644 (file)
@@ -35,27 +35,27 @@ class KNodeBase(Node):
         df.addCallback(self.checkSender)
         return df
     
-    def findNode(self, target, id):
+    def findNode(self, id, target):
         df = self.conn.sendRequest('find_node', {"target" : target, "id": id})
         df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
 
 class KNodeRead(KNodeBase):
-    def findValue(self, key, id):
+    def findValue(self, id, key):
         df =  self.conn.sendRequest('find_value', {"key" : key, "id" : id})
         df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
 
-    def getValue(self, key, num, id):
+    def getValue(self, id, key, num):
         df = self.conn.sendRequest('get_value', {"key" : key, "num": num, "id" : id})
         df.addErrback(self.errBack)
         df.addCallback(self.checkSender)
         return df
 
 class KNodeWrite(KNodeRead):
-    def storeValue(self, key, value, token, id):
+    def storeValue(self, id, key, value, token):
         df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "token" : token, "id": id})
         df.addErrback(self.errBack)
         df.addCallback(self.checkSender)