]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_p2p_Khashmir/actions.py
Rename all apt-dht files to apt-p2p.
[quix0rs-apt-p2p.git] / apt_p2p_Khashmir / actions.py
diff --git a/apt_p2p_Khashmir/actions.py b/apt_p2p_Khashmir/actions.py
new file mode 100644 (file)
index 0000000..1179713
--- /dev/null
@@ -0,0 +1,347 @@
+## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
+# see LICENSE.txt for license information
+
+"""Details of how to perform actions on remote peers."""
+
+from twisted.internet import reactor
+from twisted.python import log
+
+from khash import intify
+from util import uncompact
+
+class ActionBase:
+    """Base class for some long running asynchronous proccesses like finding nodes or values.
+    
+    @type caller: L{khashmir.Khashmir}
+    @ivar caller: the DHT instance that is performing the action
+    @type target: C{string}
+    @ivar target: the target of the action, usually a DHT key
+    @type config: C{dictionary}
+    @ivar config: the configuration variables for the DHT
+    @type action: C{string}
+    @ivar action: the name of the action to call on remote nodes
+    @type num: C{long}
+    @ivar num: the target key in integer form
+    @type queried: C{dictionary}
+    @ivar queried: the nodes that have been queried for this action,
+        keys are node IDs, values are the node itself
+    @type answered: C{dictionary}
+    @ivar answered: the nodes that have answered the queries
+    @type found: C{dictionary}
+    @ivar found: nodes that have been found so far by the action
+    @type sorted_nodes: C{list} of L{node.Node}
+    @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
+    @type results: C{dictionary}
+    @ivar results: keys are the results found so far by the action
+    @type desired_results: C{int}
+    @ivar desired_results: the minimum number of results that are needed
+        before the action should stop
+    @type callback: C{method}
+    @ivar callback: the method to call with the results
+    @type outstanding: C{int}
+    @ivar outstanding: the number of requests currently outstanding
+    @type outstanding_results: C{int}
+    @ivar outstanding_results: the number of results that are expected from
+        the requests that are currently outstanding
+    @type finished: C{boolean}
+    @ivar finished: whether the action is done
+    @type sort: C{method}
+    @ivar sort: used to sort nodes by their proximity to the target
+    """
+    
+    def __init__(self, caller, target, callback, config, action, num_results = None):
+        """Initialize the action.
+        
+        @type caller: L{khashmir.Khashmir}
+        @param caller: the DHT instance that is performing the action
+        @type target: C{string}
+        @param target: the target of the action, usually a DHT key
+        @type callback: C{method}
+        @param callback: the method to call with the results
+        @type config: C{dictionary}
+        @param config: the configuration variables for the DHT
+        @type action: C{string}
+        @param action: the name of the action to call on remote nodes
+        @type num_results: C{int}
+        @param num_results: the minimum number of results that are needed before
+            the action should stop (optional, defaults to getting all the results)
+        
+        """
+        
+        self.caller = caller
+        self.target = target
+        self.config = config
+        self.action = action
+        self.num = intify(target)
+        self.queried = {}
+        self.answered = {}
+        self.found = {}
+        self.sorted_nodes = []
+        self.results = {}
+        self.desired_results = num_results
+        self.callback = callback
+        self.outstanding = 0
+        self.outstanding_results = 0
+        self.finished = False
+    
+        def sort(a, b, num=self.num):
+            """Sort nodes relative to the ID we are looking for."""
+            x, y = num ^ a.num, num ^ b.num
+            if x > y:
+                return 1
+            elif x < y:
+                return -1
+            return 0
+        self.sort = sort
+
+    #{ Main operation
+    def goWithNodes(self, nodes):
+        """Start the action's process with a list of nodes to contact."""
+        for node in nodes:
+            if node.id == self.caller.node.id:
+                continue
+            else:
+                self.found[node.id] = node
+        self.sortNodes()
+        self.schedule()
+    
+    def schedule(self):
+        """Schedule requests to be sent to remote nodes."""
+        # Check if we are already done
+        if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
+                                     (self.desired_results < 0 and
+                                      len(self.answered) >= self.config['STORE_REDUNDANCY'])):
+            self.finished = True
+            result = self.generateResult()
+            reactor.callLater(0, self.callback, *result)
+
+        if self.finished or (self.desired_results and 
+                             len(self.results) + self.outstanding_results >= abs(self.desired_results)):
+            return
+        
+        # Loop for each node that should be processed
+        for node in self.getNodesToProcess():
+            # Don't send requests twice or to ourself
+            if node.id not in self.queried and node.id != self.caller.node.id:
+                self.queried[node.id] = 1
+                
+                # Get the action to call on the node
+                try:
+                    f = getattr(node, self.action)
+                except AttributeError:
+                    log.msg("%s doesn't have a %s method!" % (node, self.action))
+                else:
+                    # Get the arguments to the action's method
+                    try:
+                        args, expected_results = self.generateArgs(node)
+                    except ValueError:
+                        pass
+                    else:
+                        # Call the action on the remote node
+                        self.outstanding += 1
+                        self.outstanding_results += expected_results
+                        df = f(self.caller.node.id, *args)
+                        df.addCallbacks(self.gotResponse, self.actionFailed,
+                                        callbackArgs = (node, expected_results),
+                                        errbackArgs = (node, expected_results))
+                        
+            # We might have to stop for now
+            if (self.outstanding >= self.config['CONCURRENT_REQS'] or
+                (self.desired_results and
+                 len(self.results) + self.outstanding_results >= abs(self.desired_results))):
+                break
+            
+        assert self.outstanding >= 0
+        assert self.outstanding_results >= 0
+
+        # If no requests are outstanding, then we are done
+        if self.outstanding == 0:
+            self.finished = True
+            result = self.generateResult()
+            reactor.callLater(0, self.callback, *result)
+
+    def gotResponse(self, dict, node, expected_results):
+        """Receive a response from a remote node."""
+        self.caller.insertNode(node)
+        if self.finished or self.answered.has_key(node.id):
+            # a day late and a dollar short
+            return
+        self.outstanding -= 1
+        self.outstanding_results -= expected_results
+        self.answered[node.id] = 1
+        self.processResponse(dict['rsp'])
+        self.schedule()
+
+    def actionFailed(self, err, node, expected_results):
+        """Receive an error from a remote node."""
+        log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
+        log.err(err)
+        self.caller.table.nodeFailed(node)
+        self.outstanding -= 1
+        self.outstanding_results -= expected_results
+        self.schedule()
+    
+    def handleGotNodes(self, nodes):
+        """Process any received node contact info in the response.
+        
+        Not called by default, but suitable for being called by
+        L{processResponse} in a recursive node search.
+        """
+        for compact_node in nodes:
+            node_contact = uncompact(compact_node)
+            node = self.caller.Node(node_contact)
+            if not self.found.has_key(node.id):
+                self.found[node.id] = node
+
+    def sortNodes(self):
+        """Sort the nodes, if necessary.
+        
+        Assumes nodes are never removed from the L{found} dictionary.
+        """
+        if len(self.sorted_nodes) != len(self.found):
+            self.sorted_nodes = self.found.values()
+            self.sorted_nodes.sort(self.sort)
+                
+    #{ Subclass for specific actions
+    def getNodesToProcess(self):
+        """Generate a list of nodes to process next.
+        
+        This implementation is suitable for a recurring search over all nodes.
+        """
+        self.sortNodes()
+        return self.sorted_nodes[:self.config['K']]
+    
+    def generateArgs(self, node):
+        """Generate the arguments to the node's action.
+        
+        These arguments will be appended to our node ID when calling the action.
+        Also return the number of results expected from this request.
+        
+        @raise ValueError: if the node should not be queried
+        """
+        return (self.target, ), 0
+    
+    def processResponse(self, dict):
+        """Process the response dictionary received from the remote node."""
+        self.handleGotNodes(dict['nodes'])
+
+    def generateResult(self, nodes):
+        """Create the final result to return to the L{callback} function."""
+        return []
+        
+
+class FindNode(ActionBase):
+    """Find the closest nodes to the key."""
+
+    def __init__(self, caller, target, callback, config, action="findNode"):
+        ActionBase.__init__(self, caller, target, callback, config, action)
+
+    def processResponse(self, dict):
+        """Save the token received from each node."""
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateToken(dict.get('token', ''))
+        self.handleGotNodes(dict['nodes'])
+
+    def generateResult(self):
+        """Result is the K closest nodes to the target."""
+        self.sortNodes()
+        return (self.sorted_nodes[:self.config['K']], )
+    
+
+class FindValue(ActionBase):
+    """Find the closest nodes to the key and check for values."""
+
+    def __init__(self, caller, target, callback, config, action="findValue"):
+        ActionBase.__init__(self, caller, target, callback, config, action)
+
+    def processResponse(self, dict):
+        """Save the number of values each node has."""
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateNumValues(dict.get('num', 0))
+        self.handleGotNodes(dict['nodes'])
+        
+    def generateResult(self):
+        """Result is the nodes that have values, sorted by proximity to the key."""
+        self.sortNodes()
+        return ([node for node in self.sorted_nodes if node.num_values > 0], )
+    
+
+class GetValue(ActionBase):
+    """Retrieve values from a list of nodes."""
+    
+    def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
+        """Initialize the action with the locally available results.
+        
+        @type local_results: C{list} of C{string}
+        @param local_results: the values that were available in this node
+        """
+        ActionBase.__init__(self, caller, target, callback, config, action, num_results)
+        if local_results:
+            for result in local_results:
+                self.results[result] = 1
+
+    def getNodesToProcess(self):
+        """Nodes are never added, always return the same sorted node list."""
+        return self.sorted_nodes
+    
+    def generateArgs(self, node):
+        """Arguments include the number of values to request."""
+        if node.num_values > 0:
+            # Request all desired results from each node, just to be sure.
+            num_values = abs(self.desired_results) - len(self.results)
+            assert num_values > 0
+            if num_values > node.num_values:
+                num_values = 0
+            return (self.target, num_values), node.num_values
+        else:
+            raise ValueError, "Don't try and get values from this node because it doesn't have any"
+
+    def processResponse(self, dict):
+        """Save the returned values, calling the L{callback} each time there are new ones."""
+        if dict.has_key('values'):
+            def x(y, z=self.results):
+                if not z.has_key(y):
+                    z[y] = 1
+                    return y
+                else:
+                    return None
+            z = len(dict['values'])
+            v = filter(None, map(x, dict['values']))
+            if len(v):
+                reactor.callLater(0, self.callback, self.target, v)
+
+    def generateResult(self):
+        """Results have all been returned, now send the empty list to end the action."""
+        return (self.target, [])
+        
+
+class StoreValue(ActionBase):
+    """Store a value in a list of nodes."""
+
+    def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
+        """Initialize the action with the value to store.
+        
+        @type value: C{string}
+        @param value: the value to store in the nodes
+        """
+        ActionBase.__init__(self, caller, target, callback, config, action, num_results)
+        self.value = value
+        
+    def getNodesToProcess(self):
+        """Nodes are never added, always return the same sorted list."""
+        return self.sorted_nodes
+
+    def generateArgs(self, node):
+        """Args include the value to store and the node's token."""
+        if node.token:
+            return (self.target, self.value, node.token), 1
+        else:
+            raise ValueError, "Don't store at this node since we don't know it's token"
+
+    def processResponse(self, dict):
+        """Save the response, though it should be nothin but the ID."""
+        self.results[dict["id"]] = dict
+    
+    def generateResult(self):
+        """Return all the response IDs received."""
+        return (self.target, self.value, self.results.values())