Allow the actions to call the local node's remote interface.
authorCameron Dale <camrdale@gmail.com>
Thu, 27 Mar 2008 23:55:08 +0000 (16:55 -0700)
committerCameron Dale <camrdale@gmail.com>
Thu, 27 Mar 2008 23:55:08 +0000 (16:55 -0700)
The find_* actions now start with only the local node instead of
the nodes from the routing table.

apt_p2p_Khashmir/actions.py
apt_p2p_Khashmir/khashmir.py

index dfb085241df0dac73e8e83eee74610a3572b8dbf..b54632065002dbd2a0dc6c4439451ffa3c2d5ab0 100644 (file)
@@ -3,7 +3,7 @@
 
 """Details of how to perform actions on remote peers."""
 
 
 """Details of how to perform actions on remote peers."""
 
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 from twisted.python import log
 
 from khash import intify
 from twisted.python import log
 
 from khash import intify
@@ -101,10 +101,7 @@ class ActionBase:
     def goWithNodes(self, nodes):
         """Start the action's process with a list of nodes to contact."""
         for node in nodes:
     def goWithNodes(self, nodes):
         """Start the action's process with a list of nodes to contact."""
         for node in nodes:
-            if node.id == self.caller.node.id:
-                continue
-            else:
-                self.found[node.id] = node
+            self.found[node.id] = node
         self.sortNodes()
         self.schedule()
     
         self.sortNodes()
         self.schedule()
     
@@ -125,28 +122,37 @@ class ActionBase:
         # Loop for each node that should be processed
         for node in self.getNodesToProcess():
             # Don't send requests twice or to ourself
         # Loop for each node that should be processed
         for node in self.getNodesToProcess():
             # Don't send requests twice or to ourself
-            if node.id not in self.queried and node.id != self.caller.node.id:
+            if node.id not in self.queried:
                 self.queried[node.id] = 1
                 
                 # Get the action to call on the node
                 self.queried[node.id] = 1
                 
                 # Get the action to call on the node
-                try:
-                    f = getattr(node, self.action)
-                except AttributeError:
-                    log.msg("%s doesn't have a %s method!" % (node, self.action))
+                if node.id == self.caller.node.id:
+                    try:
+                        f = getattr(self.caller, 'krpc_' + self.action)
+                    except AttributeError:
+                        log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
+                        continue
                 else:
                 else:
-                    # Get the arguments to the action's method
                     try:
                     try:
-                        args, expected_results = self.generateArgs(node)
-                    except ValueError:
-                        pass
-                    else:
-                        # Call the action on the remote node
-                        self.outstanding += 1
-                        self.outstanding_results += expected_results
-                        df = f(self.caller.node.id, *args)
-                        df.addCallbacks(self.gotResponse, self.actionFailed,
-                                        callbackArgs = (node, expected_results),
-                                        errbackArgs = (node, expected_results))
+                        f = getattr(node, self.action)
+                    except AttributeError:
+                        log.msg("%s doesn't have a %s method!" % (node, self.action))
+                        continue
+
+                # Get the arguments to the action's method
+                try:
+                    args, expected_results = self.generateArgs(node)
+                except ValueError:
+                    continue
+
+                # Call the action on the remote node
+                self.outstanding += 1
+                self.outstanding_results += expected_results
+                df = defer.maybeDeferred(f, *args)
+                reactor.callLater(0, df.addCallbacks,
+                                  *(self.gotResponse, self.actionFailed),
+                                  **{'callbackArgs': (node, expected_results, df),
+                                     'errbackArgs': (node, expected_results, df)})
                         
             # We might have to stop for now
             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
                         
             # We might have to stop for now
             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
@@ -163,7 +169,7 @@ class ActionBase:
             result = self.generateResult()
             reactor.callLater(0, self.callback, *result)
 
             result = self.generateResult()
             reactor.callLater(0, self.callback, *result)
 
-    def gotResponse(self, dict, node, expected_results):
+    def gotResponse(self, dict, node, expected_results, df):
         """Receive a response from a remote node."""
         self.caller.insertNode(node)
         if self.finished or self.answered.has_key(node.id):
         """Receive a response from a remote node."""
         self.caller.insertNode(node)
         if self.finished or self.answered.has_key(node.id):
@@ -175,7 +181,7 @@ class ActionBase:
         self.processResponse(dict)
         self.schedule()
 
         self.processResponse(dict)
         self.schedule()
 
-    def actionFailed(self, err, node, expected_results):
+    def actionFailed(self, err, node, expected_results, df):
         """Receive an error from a remote node."""
         log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
         log.err(err)
         """Receive an error from a remote node."""
         log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
         log.err(err)
@@ -217,12 +223,11 @@ class ActionBase:
     def generateArgs(self, node):
         """Generate the arguments to the node's action.
         
     def generateArgs(self, node):
         """Generate the arguments to the node's action.
         
-        These arguments will be appended to our node ID when calling the action.
         Also return the number of results expected from this request.
         
         @raise ValueError: if the node should not be queried
         """
         Also return the number of results expected from this request.
         
         @raise ValueError: if the node should not be queried
         """
-        return (self.target, ), 0
+        return (self.caller.node.id, self.target), 0
     
     def processResponse(self, dict):
         """Process the response dictionary received from the remote node."""
     
     def processResponse(self, dict):
         """Process the response dictionary received from the remote node."""
@@ -272,16 +277,8 @@ class FindValue(ActionBase):
 class GetValue(ActionBase):
     """Retrieve values from a list of nodes."""
     
 class GetValue(ActionBase):
     """Retrieve values from a list of nodes."""
     
-    def __init__(self, caller, target, local_results, num_results, callback, config, stats, action="get_value"):
-        """Initialize the action with the locally available results.
-        
-        @type local_results: C{list} of C{string}
-        @param local_results: the values that were available in this node
-        """
+    def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
-        if local_results:
-            for result in local_results:
-                self.results[result] = 1
 
     def getNodesToProcess(self):
         """Nodes are never added, always return the same sorted node list."""
 
     def getNodesToProcess(self):
         """Nodes are never added, always return the same sorted node list."""
@@ -295,7 +292,7 @@ class GetValue(ActionBase):
             assert num_values > 0
             if num_values > node.num_values:
                 num_values = 0
             assert num_values > 0
             if num_values > node.num_values:
                 num_values = 0
-            return (self.target, num_values), node.num_values
+            return (self.caller.node.id, self.target, num_values), node.num_values
         else:
             raise ValueError, "Don't try and get values from this node because it doesn't have any"
 
         else:
             raise ValueError, "Don't try and get values from this node because it doesn't have any"
 
@@ -337,7 +334,7 @@ class StoreValue(ActionBase):
     def generateArgs(self, node):
         """Args include the value to store and the node's token."""
         if node.token:
     def generateArgs(self, node):
         """Args include the value to store and the node's token."""
         if node.token:
-            return (self.target, self.value, node.token), 1
+            return (self.caller.node.id, self.target, self.value, node.token), 1
         else:
             raise ValueError, "Don't store at this node since we don't know it's token"
 
         else:
             raise ValueError, "Don't store at this node since we don't know it's token"
 
index 554608feda1494ba7e1bfedf98379d9f5096caaa..7946523da0f266d3efe18aa5b55edc52474e5cc2 100644 (file)
@@ -174,22 +174,18 @@ class KhashmirBase(protocol.Factory):
         @param errback: the method to call if an error occurs
             (optional, defaults to doing nothing when an error occurs)
         """
         @param errback: the method to call if an error occurs
             (optional, defaults to doing nothing when an error occurs)
         """
-        # Get K nodes out of local table/cache
-        nodes = self.table.findNodes(id)
-        nodes = [copy(node) for node in nodes]
+        # Start with our node
+        nodes = [copy(self.node)]
+
         d = Deferred()
         if errback:
             d.addCallbacks(callback, errback)
         else:
             d.addCallback(callback)
 
         d = Deferred()
         if errback:
             d.addCallbacks(callback, errback)
         else:
             d.addCallback(callback)
 
-        # If the target ID was found
-        if len(nodes) == 1 and nodes[0].id == id:
-            d.callback(nodes)
-        else:
-            # Start the finding nodes action
-            state = FindNode(self, id, d.callback, self.config, self.stats)
-            reactor.callLater(0, state.goWithNodes, nodes)
+        # Start the finding nodes action
+        state = FindNode(self, id, d.callback, self.config, self.stats)
+        reactor.callLater(0, state.goWithNodes, nodes)
     
     def insertNode(self, node, contacted = True):
         """Try to insert a node in our local table, pinging oldest contact if necessary.
     
     def insertNode(self, node, contacted = True):
         """Try to insert a node in our local table, pinging oldest contact if necessary.
@@ -380,9 +376,9 @@ class KhashmirRead(KhashmirBase):
         @param errback: the method to call if an error occurs
             (optional, defaults to doing nothing when an error occurs)
         """
         @param errback: the method to call if an error occurs
             (optional, defaults to doing nothing when an error occurs)
         """
-        # Get K nodes out of local table/cache
-        nodes = self.table.findNodes(key)
-        nodes = [copy(node) for node in nodes]
+        # Start with ourself
+        nodes = [copy(self.node)]
+        
         d = Deferred()
         if errback:
             d.addCallbacks(callback, errback)
         d = Deferred()
         if errback:
             d.addCallbacks(callback, errback)
@@ -407,17 +403,18 @@ class KhashmirRead(KhashmirBase):
         @type searchlocal: C{boolean}
         @param searchlocal: whether to also look for any local values
         """
         @type searchlocal: C{boolean}
         @param searchlocal: whether to also look for any local values
         """
-        # Get any local values
-        if searchlocal:
-            l = self.store.retrieveValues(key)
-            if len(l) > 0:
-                reactor.callLater(0, callback, key, l)
-        else:
-            l = []
 
 
-        def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self):
+        def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal):
             """Use the found nodes to send requests for values to."""
             """Use the found nodes to send requests for values to."""
-            state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
+            # Get any local values
+            if searchlocal:
+                l = self.store.retrieveValues(key)
+                if len(l) > 0:
+                    node = copy(self.node)
+                    node.updateNumValues(len(l))
+                    nodes = nodes + [node]
+
+            state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats)
             reactor.callLater(0, state.goWithNodes, nodes)
             
         # First lookup nodes that have values for the key
             reactor.callLater(0, state.goWithNodes, nodes)
             
         # First lookup nodes that have values for the key