]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/actions.py
Rename all apt-dht files to apt-p2p.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py
deleted file mode 100644 (file)
index 1179713..0000000
+++ /dev/null
@@ -1,347 +0,0 @@
-## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
-# see LICENSE.txt for license information
-
-"""Details of how to perform actions on remote peers."""
-
-from twisted.internet import reactor
-from twisted.python import log
-
-from khash import intify
-from util import uncompact
-
-class ActionBase:
-    """Base class for some long running asynchronous proccesses like finding nodes or values.
-    
-    @type caller: L{khashmir.Khashmir}
-    @ivar caller: the DHT instance that is performing the action
-    @type target: C{string}
-    @ivar target: the target of the action, usually a DHT key
-    @type config: C{dictionary}
-    @ivar config: the configuration variables for the DHT
-    @type action: C{string}
-    @ivar action: the name of the action to call on remote nodes
-    @type num: C{long}
-    @ivar num: the target key in integer form
-    @type queried: C{dictionary}
-    @ivar queried: the nodes that have been queried for this action,
-        keys are node IDs, values are the node itself
-    @type answered: C{dictionary}
-    @ivar answered: the nodes that have answered the queries
-    @type found: C{dictionary}
-    @ivar found: nodes that have been found so far by the action
-    @type sorted_nodes: C{list} of L{node.Node}
-    @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
-    @type results: C{dictionary}
-    @ivar results: keys are the results found so far by the action
-    @type desired_results: C{int}
-    @ivar desired_results: the minimum number of results that are needed
-        before the action should stop
-    @type callback: C{method}
-    @ivar callback: the method to call with the results
-    @type outstanding: C{int}
-    @ivar outstanding: the number of requests currently outstanding
-    @type outstanding_results: C{int}
-    @ivar outstanding_results: the number of results that are expected from
-        the requests that are currently outstanding
-    @type finished: C{boolean}
-    @ivar finished: whether the action is done
-    @type sort: C{method}
-    @ivar sort: used to sort nodes by their proximity to the target
-    """
-    
-    def __init__(self, caller, target, callback, config, action, num_results = None):
-        """Initialize the action.
-        
-        @type caller: L{khashmir.Khashmir}
-        @param caller: the DHT instance that is performing the action
-        @type target: C{string}
-        @param target: the target of the action, usually a DHT key
-        @type callback: C{method}
-        @param callback: the method to call with the results
-        @type config: C{dictionary}
-        @param config: the configuration variables for the DHT
-        @type action: C{string}
-        @param action: the name of the action to call on remote nodes
-        @type num_results: C{int}
-        @param num_results: the minimum number of results that are needed before
-            the action should stop (optional, defaults to getting all the results)
-        
-        """
-        
-        self.caller = caller
-        self.target = target
-        self.config = config
-        self.action = action
-        self.num = intify(target)
-        self.queried = {}
-        self.answered = {}
-        self.found = {}
-        self.sorted_nodes = []
-        self.results = {}
-        self.desired_results = num_results
-        self.callback = callback
-        self.outstanding = 0
-        self.outstanding_results = 0
-        self.finished = False
-    
-        def sort(a, b, num=self.num):
-            """Sort nodes relative to the ID we are looking for."""
-            x, y = num ^ a.num, num ^ b.num
-            if x > y:
-                return 1
-            elif x < y:
-                return -1
-            return 0
-        self.sort = sort
-
-    #{ Main operation
-    def goWithNodes(self, nodes):
-        """Start the action's process with a list of nodes to contact."""
-        for node in nodes:
-            if node.id == self.caller.node.id:
-                continue
-            else:
-                self.found[node.id] = node
-        self.sortNodes()
-        self.schedule()
-    
-    def schedule(self):
-        """Schedule requests to be sent to remote nodes."""
-        # Check if we are already done
-        if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
-                                     (self.desired_results < 0 and
-                                      len(self.answered) >= self.config['STORE_REDUNDANCY'])):
-            self.finished = True
-            result = self.generateResult()
-            reactor.callLater(0, self.callback, *result)
-
-        if self.finished or (self.desired_results and 
-                             len(self.results) + self.outstanding_results >= abs(self.desired_results)):
-            return
-        
-        # Loop for each node that should be processed
-        for node in self.getNodesToProcess():
-            # Don't send requests twice or to ourself
-            if node.id not in self.queried and node.id != self.caller.node.id:
-                self.queried[node.id] = 1
-                
-                # Get the action to call on the node
-                try:
-                    f = getattr(node, self.action)
-                except AttributeError:
-                    log.msg("%s doesn't have a %s method!" % (node, self.action))
-                else:
-                    # Get the arguments to the action's method
-                    try:
-                        args, expected_results = self.generateArgs(node)
-                    except ValueError:
-                        pass
-                    else:
-                        # Call the action on the remote node
-                        self.outstanding += 1
-                        self.outstanding_results += expected_results
-                        df = f(self.caller.node.id, *args)
-                        df.addCallbacks(self.gotResponse, self.actionFailed,
-                                        callbackArgs = (node, expected_results),
-                                        errbackArgs = (node, expected_results))
-                        
-            # We might have to stop for now
-            if (self.outstanding >= self.config['CONCURRENT_REQS'] or
-                (self.desired_results and
-                 len(self.results) + self.outstanding_results >= abs(self.desired_results))):
-                break
-            
-        assert self.outstanding >= 0
-        assert self.outstanding_results >= 0
-
-        # If no requests are outstanding, then we are done
-        if self.outstanding == 0:
-            self.finished = True
-            result = self.generateResult()
-            reactor.callLater(0, self.callback, *result)
-
-    def gotResponse(self, dict, node, expected_results):
-        """Receive a response from a remote node."""
-        self.caller.insertNode(node)
-        if self.finished or self.answered.has_key(node.id):
-            # a day late and a dollar short
-            return
-        self.outstanding -= 1
-        self.outstanding_results -= expected_results
-        self.answered[node.id] = 1
-        self.processResponse(dict['rsp'])
-        self.schedule()
-
-    def actionFailed(self, err, node, expected_results):
-        """Receive an error from a remote node."""
-        log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
-        log.err(err)
-        self.caller.table.nodeFailed(node)
-        self.outstanding -= 1
-        self.outstanding_results -= expected_results
-        self.schedule()
-    
-    def handleGotNodes(self, nodes):
-        """Process any received node contact info in the response.
-        
-        Not called by default, but suitable for being called by
-        L{processResponse} in a recursive node search.
-        """
-        for compact_node in nodes:
-            node_contact = uncompact(compact_node)
-            node = self.caller.Node(node_contact)
-            if not self.found.has_key(node.id):
-                self.found[node.id] = node
-
-    def sortNodes(self):
-        """Sort the nodes, if necessary.
-        
-        Assumes nodes are never removed from the L{found} dictionary.
-        """
-        if len(self.sorted_nodes) != len(self.found):
-            self.sorted_nodes = self.found.values()
-            self.sorted_nodes.sort(self.sort)
-                
-    #{ Subclass for specific actions
-    def getNodesToProcess(self):
-        """Generate a list of nodes to process next.
-        
-        This implementation is suitable for a recurring search over all nodes.
-        """
-        self.sortNodes()
-        return self.sorted_nodes[:self.config['K']]
-    
-    def generateArgs(self, node):
-        """Generate the arguments to the node's action.
-        
-        These arguments will be appended to our node ID when calling the action.
-        Also return the number of results expected from this request.
-        
-        @raise ValueError: if the node should not be queried
-        """
-        return (self.target, ), 0
-    
-    def processResponse(self, dict):
-        """Process the response dictionary received from the remote node."""
-        self.handleGotNodes(dict['nodes'])
-
-    def generateResult(self, nodes):
-        """Create the final result to return to the L{callback} function."""
-        return []
-        
-
-class FindNode(ActionBase):
-    """Find the closest nodes to the key."""
-
-    def __init__(self, caller, target, callback, config, action="findNode"):
-        ActionBase.__init__(self, caller, target, callback, config, action)
-
-    def processResponse(self, dict):
-        """Save the token received from each node."""
-        if dict["id"] in self.found:
-            self.found[dict["id"]].updateToken(dict.get('token', ''))
-        self.handleGotNodes(dict['nodes'])
-
-    def generateResult(self):
-        """Result is the K closest nodes to the target."""
-        self.sortNodes()
-        return (self.sorted_nodes[:self.config['K']], )
-    
-
-class FindValue(ActionBase):
-    """Find the closest nodes to the key and check for values."""
-
-    def __init__(self, caller, target, callback, config, action="findValue"):
-        ActionBase.__init__(self, caller, target, callback, config, action)
-
-    def processResponse(self, dict):
-        """Save the number of values each node has."""
-        if dict["id"] in self.found:
-            self.found[dict["id"]].updateNumValues(dict.get('num', 0))
-        self.handleGotNodes(dict['nodes'])
-        
-    def generateResult(self):
-        """Result is the nodes that have values, sorted by proximity to the key."""
-        self.sortNodes()
-        return ([node for node in self.sorted_nodes if node.num_values > 0], )
-    
-
-class GetValue(ActionBase):
-    """Retrieve values from a list of nodes."""
-    
-    def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
-        """Initialize the action with the locally available results.
-        
-        @type local_results: C{list} of C{string}
-        @param local_results: the values that were available in this node
-        """
-        ActionBase.__init__(self, caller, target, callback, config, action, num_results)
-        if local_results:
-            for result in local_results:
-                self.results[result] = 1
-
-    def getNodesToProcess(self):
-        """Nodes are never added, always return the same sorted node list."""
-        return self.sorted_nodes
-    
-    def generateArgs(self, node):
-        """Arguments include the number of values to request."""
-        if node.num_values > 0:
-            # Request all desired results from each node, just to be sure.
-            num_values = abs(self.desired_results) - len(self.results)
-            assert num_values > 0
-            if num_values > node.num_values:
-                num_values = 0
-            return (self.target, num_values), node.num_values
-        else:
-            raise ValueError, "Don't try and get values from this node because it doesn't have any"
-
-    def processResponse(self, dict):
-        """Save the returned values, calling the L{callback} each time there are new ones."""
-        if dict.has_key('values'):
-            def x(y, z=self.results):
-                if not z.has_key(y):
-                    z[y] = 1
-                    return y
-                else:
-                    return None
-            z = len(dict['values'])
-            v = filter(None, map(x, dict['values']))
-            if len(v):
-                reactor.callLater(0, self.callback, self.target, v)
-
-    def generateResult(self):
-        """Results have all been returned, now send the empty list to end the action."""
-        return (self.target, [])
-        
-
-class StoreValue(ActionBase):
-    """Store a value in a list of nodes."""
-
-    def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
-        """Initialize the action with the value to store.
-        
-        @type value: C{string}
-        @param value: the value to store in the nodes
-        """
-        ActionBase.__init__(self, caller, target, callback, config, action, num_results)
-        self.value = value
-        
-    def getNodesToProcess(self):
-        """Nodes are never added, always return the same sorted list."""
-        return self.sorted_nodes
-
-    def generateArgs(self, node):
-        """Args include the value to store and the node's token."""
-        if node.token:
-            return (self.target, self.value, node.token), 1
-        else:
-            raise ValueError, "Don't store at this node since we don't know it's token"
-
-    def processResponse(self, dict):
-        """Save the response, though it should be nothin but the ID."""
-        self.results[dict["id"]] = dict
-    
-    def generateResult(self):
-        """Return all the response IDs received."""
-        return (self.target, self.value, self.results.values())