]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/actions.py
Disconnect the insertNode calls from the callers so errors don't affect them.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / actions.py
index 3863f9a069cc698208532081995472969398eb16..bd1cd2f6f1905672717b1b90e0358040afbdf8e9 100644 (file)
@@ -3,10 +3,13 @@
 
 """Details of how to perform actions on remote peers."""
 
-from twisted.internet import reactor
+from datetime import datetime
+
+from twisted.internet import reactor, defer
 from twisted.python import log
 
 from khash import intify
+from ktable import K
 from util import uncompact
 
 class ActionBase:
@@ -20,6 +23,8 @@ class ActionBase:
     @ivar config: the configuration variables for the DHT
     @type action: C{string}
     @ivar action: the name of the action to call on remote nodes
+    @type stats: L{stats.StatsLogger}
+    @ivar stats: the statistics modules to report to
     @type num: C{long}
     @ivar num: the target key in integer form
     @type queried: C{dictionary}
@@ -45,6 +50,8 @@ class ActionBase:
         the requests that are currently outstanding
     @type finished: C{boolean}
     @ivar finished: whether the action is done
+    @type started: C{datetime.datetime}
+    @ivar started: the time the action was started at
     @type sort: C{method}
     @ivar sort: used to sort nodes by their proximity to the target
     """
@@ -74,7 +81,8 @@ class ActionBase:
         self.target = target
         self.config = config
         self.action = action
-        stats.startedAction(action)
+        self.stats = stats
+        self.stats.startedAction(action)
         self.num = intify(target)
         self.queried = {}
         self.answered = {}
@@ -86,6 +94,7 @@ class ActionBase:
         self.outstanding = 0
         self.outstanding_results = 0
         self.finished = False
+        self.started = datetime.now()
     
         def sort(a, b, num=self.num):
             """Sort nodes relative to the ID we are looking for."""
@@ -100,16 +109,17 @@ class ActionBase:
     #{ Main operation
     def goWithNodes(self, nodes):
         """Start the action's process with a list of nodes to contact."""
+        self.started = datetime.now()
         for node in nodes:
-            if node.id == self.caller.node.id:
-                continue
-            else:
-                self.found[node.id] = node
+            self.found[node.id] = node
         self.sortNodes()
         self.schedule()
     
     def schedule(self):
         """Schedule requests to be sent to remote nodes."""
+        if self.finished:
+            return
+        
         # Check if we are already done
         if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
                                      (self.desired_results < 0 and
@@ -117,36 +127,47 @@ class ActionBase:
             self.finished = True
             result = self.generateResult()
             reactor.callLater(0, self.callback, *result)
+            return
 
-        if self.finished or (self.desired_results and 
-                             len(self.results) + self.outstanding_results >= abs(self.desired_results)):
+        # Check if we have enough outstanding results coming
+        if (self.desired_results and 
+            len(self.results) + self.outstanding_results >= abs(self.desired_results)):
             return
         
         # Loop for each node that should be processed
         for node in self.getNodesToProcess():
             # Don't send requests twice or to ourself
-            if node.id not in self.queried and node.id != self.caller.node.id:
+            if node.id not in self.queried:
                 self.queried[node.id] = 1
                 
                 # Get the action to call on the node
-                try:
-                    f = getattr(node, self.action)
-                except AttributeError:
-                    log.msg("%s doesn't have a %s method!" % (node, self.action))
+                if node.id == self.caller.node.id:
+                    try:
+                        f = getattr(self.caller, 'krpc_' + self.action)
+                    except AttributeError:
+                        log.msg("%s doesn't have a %s method!" % (node, 'krpc_' + self.action))
+                        continue
                 else:
-                    # Get the arguments to the action's method
                     try:
-                        args, expected_results = self.generateArgs(node)
-                    except ValueError:
-                        pass
-                    else:
-                        # Call the action on the remote node
-                        self.outstanding += 1
-                        self.outstanding_results += expected_results
-                        df = f(self.caller.node.id, *args)
-                        df.addCallbacks(self.gotResponse, self.actionFailed,
-                                        callbackArgs = (node, expected_results),
-                                        errbackArgs = (node, expected_results))
+                        f = getattr(node, self.action)
+                    except AttributeError:
+                        log.msg("%s doesn't have a %s method!" % (node, self.action))
+                        continue
+
+                # Get the arguments to the action's method
+                try:
+                    args, expected_results = self.generateArgs(node)
+                except ValueError:
+                    continue
+
+                # Call the action on the remote node
+                self.outstanding += 1
+                self.outstanding_results += expected_results
+                df = defer.maybeDeferred(f, *args)
+                reactor.callLater(0, df.addCallbacks,
+                                  *(self.gotResponse, self.actionFailed),
+                                  **{'callbackArgs': (node, expected_results, df),
+                                     'errbackArgs': (node, expected_results, df)})
                         
             # We might have to stop for now
             if (self.outstanding >= self.config['CONCURRENT_REQS'] or
@@ -163,22 +184,21 @@ class ActionBase:
             result = self.generateResult()
             reactor.callLater(0, self.callback, *result)
 
-    def gotResponse(self, dict, node, expected_results):
+    def gotResponse(self, dict, node, expected_results, df):
         """Receive a response from a remote node."""
-        self.caller.insertNode(node)
+        reactor.callLater(0, self.caller.insertNode, node)
         if self.finished or self.answered.has_key(node.id):
             # a day late and a dollar short
             return
         self.outstanding -= 1
         self.outstanding_results -= expected_results
         self.answered[node.id] = 1
-        self.processResponse(dict['rsp'])
+        self.processResponse(dict)
         self.schedule()
 
-    def actionFailed(self, err, node, expected_results):
+    def actionFailed(self, err, node, expected_results, df):
         """Receive an error from a remote node."""
-        log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
-        log.err(err)
+        log.msg("action %s failed on %s/%s: %s" % (self.action, node.host, node.port, err.getErrorMessage()))
         self.caller.table.nodeFailed(node)
         self.outstanding -= 1
         self.outstanding_results -= expected_results
@@ -212,17 +232,16 @@ class ActionBase:
         This implementation is suitable for a recurring search over all nodes.
         """
         self.sortNodes()
-        return self.sorted_nodes[:self.config['K']]
+        return self.sorted_nodes[:K]
     
     def generateArgs(self, node):
         """Generate the arguments to the node's action.
         
-        These arguments will be appended to our node ID when calling the action.
         Also return the number of results expected from this request.
         
         @raise ValueError: if the node should not be queried
         """
-        return (self.target, ), 0
+        return (self.caller.node.id, self.target), 0
     
     def processResponse(self, dict):
         """Process the response dictionary received from the remote node."""
@@ -230,13 +249,14 @@ class ActionBase:
 
     def generateResult(self, nodes):
         """Create the final result to return to the L{callback} function."""
+        self.stats.completedAction(self.action, self.started)
         return []
         
 
 class FindNode(ActionBase):
     """Find the closest nodes to the key."""
 
-    def __init__(self, caller, target, callback, config, stats, action="findNode"):
+    def __init__(self, caller, target, callback, config, stats, action="find_node"):
         ActionBase.__init__(self, caller, target, callback, config, stats, action)
 
     def processResponse(self, dict):
@@ -248,13 +268,14 @@ class FindNode(ActionBase):
     def generateResult(self):
         """Result is the K closest nodes to the target."""
         self.sortNodes()
-        return (self.sorted_nodes[:self.config['K']], )
+        self.stats.completedAction(self.action, self.started)
+        return (self.sorted_nodes[:K], )
     
 
 class FindValue(ActionBase):
     """Find the closest nodes to the key and check for values."""
 
-    def __init__(self, caller, target, callback, config, stats, action="findValue"):
+    def __init__(self, caller, target, callback, config, stats, action="find_value"):
         ActionBase.__init__(self, caller, target, callback, config, stats, action)
 
     def processResponse(self, dict):
@@ -266,22 +287,15 @@ class FindValue(ActionBase):
     def generateResult(self):
         """Result is the nodes that have values, sorted by proximity to the key."""
         self.sortNodes()
+        self.stats.completedAction(self.action, self.started)
         return ([node for node in self.sorted_nodes if node.num_values > 0], )
     
 
 class GetValue(ActionBase):
     """Retrieve values from a list of nodes."""
     
-    def __init__(self, caller, target, local_results, num_results, callback, config, stats, action="getValue"):
-        """Initialize the action with the locally available results.
-        
-        @type local_results: C{list} of C{string}
-        @param local_results: the values that were available in this node
-        """
+    def __init__(self, caller, target, num_results, callback, config, stats, action="get_value"):
         ActionBase.__init__(self, caller, target, callback, config, stats, action, num_results)
-        if local_results:
-            for result in local_results:
-                self.results[result] = 1
 
     def getNodesToProcess(self):
         """Nodes are never added, always return the same sorted node list."""
@@ -295,7 +309,7 @@ class GetValue(ActionBase):
             assert num_values > 0
             if num_values > node.num_values:
                 num_values = 0
-            return (self.target, num_values), node.num_values
+            return (self.caller.node.id, self.target, num_values), node.num_values
         else:
             raise ValueError, "Don't try and get values from this node because it doesn't have any"
 
@@ -315,13 +329,14 @@ class GetValue(ActionBase):
 
     def generateResult(self):
         """Results have all been returned, now send the empty list to end the action."""
+        self.stats.completedAction(self.action, self.started)
         return (self.target, [])
         
 
 class StoreValue(ActionBase):
     """Store a value in a list of nodes."""
 
-    def __init__(self, caller, target, value, num_results, callback, config, stats, action="storeValue"):
+    def __init__(self, caller, target, value, num_results, callback, config, stats, action="store_value"):
         """Initialize the action with the value to store.
         
         @type value: C{string}
@@ -337,7 +352,7 @@ class StoreValue(ActionBase):
     def generateArgs(self, node):
         """Args include the value to store and the node's token."""
         if node.token:
-            return (self.target, self.value, node.token), 1
+            return (self.caller.node.id, self.target, self.value, node.token), 1
         else:
             raise ValueError, "Don't store at this node since we don't know it's token"
 
@@ -347,4 +362,5 @@ class StoreValue(ActionBase):
     
     def generateResult(self):
         """Return all the response IDs received."""
+        self.stats.completedAction(self.action, self.started)
         return (self.target, self.value, self.results.values())