Various documentation fixes and additions.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
index 9bfa2e8a708be3a5726b35fff76f7dad0772d9bf..1179713c22847fc3236cd95e61684b38c6cf0d80 100644 (file)
@@ -1,26 +1,91 @@
 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
 # see LICENSE.txt for license information
 
+"""Details of how to perform actions on remote peers."""
+
 from twisted.internet import reactor
+from twisted.python import log
 
 from khash import intify
+from util import uncompact
 
 class ActionBase:
-    """ base class for some long running asynchronous proccesses like finding nodes or values """
-    def __init__(self, caller, target, callback, config):
+    """Base class for some long running asynchronous proccesses like finding nodes or values.
+    
+    @type caller: L{khashmir.Khashmir}
+    @ivar caller: the DHT instance that is performing the action
+    @type target: C{string}
+    @ivar target: the target of the action, usually a DHT key
+    @type config: C{dictionary}
+    @ivar config: the configuration variables for the DHT
+    @type action: C{string}
+    @ivar action: the name of the action to call on remote nodes
+    @type num: C{long}
+    @ivar num: the target key in integer form
+    @type queried: C{dictionary}
+    @ivar queried: the nodes that have been queried for this action,
+        keys are node IDs, values are the node itself
+    @type answered: C{dictionary}
+    @ivar answered: the nodes that have answered the queries
+    @type found: C{dictionary}
+    @ivar found: nodes that have been found so far by the action
+    @type sorted_nodes: C{list} of L{node.Node}
+    @ivar sorted_nodes: a sorted list of nodes by there proximity to the key
+    @type results: C{dictionary}
+    @ivar results: keys are the results found so far by the action
+    @type desired_results: C{int}
+    @ivar desired_results: the minimum number of results that are needed
+        before the action should stop
+    @type callback: C{method}
+    @ivar callback: the method to call with the results
+    @type outstanding: C{int}
+    @ivar outstanding: the number of requests currently outstanding
+    @type outstanding_results: C{int}
+    @ivar outstanding_results: the number of results that are expected from
+        the requests that are currently outstanding
+    @type finished: C{boolean}
+    @ivar finished: whether the action is done
+    @type sort: C{method}
+    @ivar sort: used to sort nodes by their proximity to the target
+    """
+    
+    def __init__(self, caller, target, callback, config, action, num_results = None):
+        """Initialize the action.
+        
+        @type caller: L{khashmir.Khashmir}
+        @param caller: the DHT instance that is performing the action
+        @type target: C{string}
+        @param target: the target of the action, usually a DHT key
+        @type callback: C{method}
+        @param callback: the method to call with the results
+        @type config: C{dictionary}
+        @param config: the configuration variables for the DHT
+        @type action: C{string}
+        @param action: the name of the action to call on remote nodes
+        @type num_results: C{int}
+        @param num_results: the minimum number of results that are needed before
+            the action should stop (optional, defaults to getting all the results)
+        
+        """
+        
         self.caller = caller
         self.target = target
         self.config = config
+        self.action = action
         self.num = intify(target)
-        self.found = {}
         self.queried = {}
         self.answered = {}
+        self.found = {}
+        self.sorted_nodes = []
+        self.results = {}
+        self.desired_results = num_results
         self.callback = callback
         self.outstanding = 0
-        self.finished = 0
+        self.outstanding_results = 0
+        self.finished = False
     
         def sort(a, b, num=self.num):
-            """ this function is for sorting nodes relative to the ID we are looking for """
+            """Sort nodes relative to the ID we are looking for."""
             x, y = num ^ a.num, num ^ b.num
             if x > y:
                 return 1
@@ -28,106 +93,212 @@ class ActionBase:
                 return -1
             return 0
         self.sort = sort
-        
-    def goWithNodes(self, t):
-        pass
-    
+
+    #{ Main operation
+    def goWithNodes(self, nodes):
+        """Start the action's process with a list of nodes to contact."""
+        for node in nodes:
+            if node.id == self.caller.node.id:
+                continue
+            else:
+                self.found[node.id] = node
+        self.sortNodes()
+        self.schedule()
     
+    def schedule(self):
+        """Schedule requests to be sent to remote nodes."""
+        # Check if we are already done
+        if self.desired_results and ((len(self.results) >= abs(self.desired_results)) or
+                                     (self.desired_results < 0 and
+                                      len(self.answered) >= self.config['STORE_REDUNDANCY'])):
+            self.finished = True
+            result = self.generateResult()
+            reactor.callLater(0, self.callback, *result)
+
+        if self.finished or (self.desired_results and 
+                             len(self.results) + self.outstanding_results >= abs(self.desired_results)):
+            return
+        
+        # Loop for each node that should be processed
+        for node in self.getNodesToProcess():
+            # Don't send requests twice or to ourself
+            if node.id not in self.queried and node.id != self.caller.node.id:
+                self.queried[node.id] = 1
+                
+                # Get the action to call on the node
+                try:
+                    f = getattr(node, self.action)
+                except AttributeError:
+                    log.msg("%s doesn't have a %s method!" % (node, self.action))
+                else:
+                    # Get the arguments to the action's method
+                    try:
+                        args, expected_results = self.generateArgs(node)
+                    except ValueError:
+                        pass
+                    else:
+                        # Call the action on the remote node
+                        self.outstanding += 1
+                        self.outstanding_results += expected_results
+                        df = f(self.caller.node.id, *args)
+                        df.addCallbacks(self.gotResponse, self.actionFailed,
+                                        callbackArgs = (node, expected_results),
+                                        errbackArgs = (node, expected_results))
+                        
+            # We might have to stop for now
+            if (self.outstanding >= self.config['CONCURRENT_REQS'] or
+                (self.desired_results and
+                 len(self.results) + self.outstanding_results >= abs(self.desired_results))):
+                break
+            
+        assert self.outstanding >= 0
+        assert self.outstanding_results >= 0
 
-FIND_NODE_TIMEOUT = 15
+        # If no requests are outstanding, then we are done
+        if self.outstanding == 0:
+            self.finished = True
+            result = self.generateResult()
+            reactor.callLater(0, self.callback, *result)
 
-class FindNode(ActionBase):
-    """ find node action merits it's own class as it is a long running stateful process """
-    def handleGotNodes(self, dict):
-        _krpc_sender = dict['_krpc_sender']
-        dict = dict['rsp']
-        n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
-        self.caller.insertNode(n)
-        l = dict["nodes"]
-        if self.finished or self.answered.has_key(dict["id"]):
+    def gotResponse(self, dict, node, expected_results):
+        """Receive a response from a remote node."""
+        self.caller.insertNode(node)
+        if self.finished or self.answered.has_key(node.id):
             # a day late and a dollar short
             return
-        self.outstanding = self.outstanding - 1
-        self.answered[dict["id"]] = 1
-        for node in l:
-            n = self.caller.Node(node)
-            if not self.found.has_key(n.id):
-                self.found[n.id] = n
+        self.outstanding -= 1
+        self.outstanding_results -= expected_results
+        self.answered[node.id] = 1
+        self.processResponse(dict['rsp'])
+        self.schedule()
+
+    def actionFailed(self, err, node, expected_results):
+        """Receive an error from a remote node."""
+        log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
+        log.err(err)
+        self.caller.table.nodeFailed(node)
+        self.outstanding -= 1
+        self.outstanding_results -= expected_results
         self.schedule()
+    
+    def handleGotNodes(self, nodes):
+        """Process any received node contact info in the response.
         
-    def schedule(self):
+        Not called by default, but suitable for being called by
+        L{processResponse} in a recursive node search.
         """
-            send messages to new peers, if necessary
+        for compact_node in nodes:
+            node_contact = uncompact(compact_node)
+            node = self.caller.Node(node_contact)
+            if not self.found.has_key(node.id):
+                self.found[node.id] = node
+
+    def sortNodes(self):
+        """Sort the nodes, if necessary.
+        
+        Assumes nodes are never removed from the L{found} dictionary.
         """
-        if self.finished:
-            return
-        l = self.found.values()
-        l.sort(self.sort)
-        for node in l[:self.config['K']]:
-            if node.id == self.target:
-                self.finished=1
-                return self.callback([node])
-            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
-                #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-                df = node.findNode(self.target, self.caller.node.id)
-                df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
-                self.outstanding = self.outstanding + 1
-                self.queried[node.id] = 1
-            if self.outstanding >= self.config['CONCURRENT_REQS']:
-                break
-        assert self.outstanding >=0
-        if self.outstanding == 0:
-            ## all done!!
-            self.finished=1
-            reactor.callLater(0, self.callback, l[:self.config['K']])
-    
-    def makeMsgFailed(self, node):
-        def defaultGotNodes(err, self=self, node=node):
-            print ">>> find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port), err
-            self.caller.table.nodeFailed(node)
-            self.outstanding = self.outstanding - 1
-            self.schedule()
-        return defaultGotNodes
-    
-    def goWithNodes(self, nodes):
+        if len(self.sorted_nodes) != len(self.found):
+            self.sorted_nodes = self.found.values()
+            self.sorted_nodes.sort(self.sort)
+                
+    #{ Subclass for specific actions
+    def getNodesToProcess(self):
+        """Generate a list of nodes to process next.
+        
+        This implementation is suitable for a recurring search over all nodes.
         """
-            this starts the process, our argument is a transaction with t.extras being our list of nodes
-            it's a transaction since we got called from the dispatcher
+        self.sortNodes()
+        return self.sorted_nodes[:self.config['K']]
+    
+    def generateArgs(self, node):
+        """Generate the arguments to the node's action.
+        
+        These arguments will be appended to our node ID when calling the action.
+        Also return the number of results expected from this request.
+        
+        @raise ValueError: if the node should not be queried
         """
-        for node in nodes:
-            if node.id == self.caller.node.id:
-                continue
-            else:
-                self.found[node.id] = node
+        return (self.target, ), 0
+    
+    def processResponse(self, dict):
+        """Process the response dictionary received from the remote node."""
+        self.handleGotNodes(dict['nodes'])
+
+    def generateResult(self, nodes):
+        """Create the final result to return to the L{callback} function."""
+        return []
         
-        self.schedule()
+
+class FindNode(ActionBase):
+    """Find the closest nodes to the key."""
+
+    def __init__(self, caller, target, callback, config, action="findNode"):
+        ActionBase.__init__(self, caller, target, callback, config, action)
+
+    def processResponse(self, dict):
+        """Save the token received from each node."""
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateToken(dict.get('token', ''))
+        self.handleGotNodes(dict['nodes'])
+
+    def generateResult(self):
+        """Result is the K closest nodes to the target."""
+        self.sortNodes()
+        return (self.sorted_nodes[:self.config['K']], )
     
 
-get_value_timeout = 15
-class GetValue(FindNode):
-    def __init__(self, caller, target, callback, config, find="findValue"):
-        FindNode.__init__(self, caller, target, callback, config)
-        self.findValue = find
-            
-    """ get value task """
-    def handleGotNodes(self, dict):
-        _krpc_sender = dict['_krpc_sender']
-        dict = dict['rsp']
-        n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
-        self.caller.insertNode(n)
-        if self.finished or self.answered.has_key(dict["id"]):
-            # a day late and a dollar short
-            return
-        self.outstanding = self.outstanding - 1
-        self.answered[dict["id"]] = 1
-        # go through nodes
-        # if we have any closer than what we already got, query them
-        if dict.has_key('nodes'):
-            for node in dict['nodes']:
-                n = self.caller.Node(node)
-                if not self.found.has_key(n.id):
-                    self.found[n.id] = n
-        elif dict.has_key('values'):
+class FindValue(ActionBase):
+    """Find the closest nodes to the key and check for values."""
+
+    def __init__(self, caller, target, callback, config, action="findValue"):
+        ActionBase.__init__(self, caller, target, callback, config, action)
+
+    def processResponse(self, dict):
+        """Save the number of values each node has."""
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateNumValues(dict.get('num', 0))
+        self.handleGotNodes(dict['nodes'])
+        
+    def generateResult(self):
+        """Result is the nodes that have values, sorted by proximity to the key."""
+        self.sortNodes()
+        return ([node for node in self.sorted_nodes if node.num_values > 0], )
+    
+
+class GetValue(ActionBase):
+    """Retrieve values from a list of nodes."""
+    
+    def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
+        """Initialize the action with the locally available results.
+        
+        @type local_results: C{list} of C{string}
+        @param local_results: the values that were available in this node
+        """
+        ActionBase.__init__(self, caller, target, callback, config, action, num_results)
+        if local_results:
+            for result in local_results:
+                self.results[result] = 1
+
+    def getNodesToProcess(self):
+        """Nodes are never added, always return the same sorted node list."""
+        return self.sorted_nodes
+    
+    def generateArgs(self, node):
+        """Arguments include the number of values to request."""
+        if node.num_values > 0:
+            # Request all desired results from each node, just to be sure.
+            num_values = abs(self.desired_results) - len(self.results)
+            assert num_values > 0
+            if num_values > node.num_values:
+                num_values = 0
+            return (self.target, num_values), node.num_values
+        else:
+            raise ValueError, "Don't try and get values from this node because it doesn't have any"
+
+    def processResponse(self, dict):
+        """Save the returned values, calling the L{callback} each time there are new ones."""
+        if dict.has_key('values'):
             def x(y, z=self.results):
                 if not z.has_key(y):
                     z[y] = 1
@@ -136,126 +307,41 @@ class GetValue(FindNode):
                     return None
             z = len(dict['values'])
             v = filter(None, map(x, dict['values']))
-            if(len(v)):
+            if len(v):
                 reactor.callLater(0, self.callback, self.target, v)
-        self.schedule()
-        
-    ## get value
-    def schedule(self):
-        if self.finished:
-            return
-        l = self.found.values()
-        l.sort(self.sort)
-        
-        for node in l[:self.config['K']]:
-            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
-                #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
-                try:
-                    f = getattr(node, self.findValue)
-                except AttributeError:
-                    print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
-                else:
-                    df = f(self.target, self.caller.node.id)
-                    df.addCallback(self.handleGotNodes)
-                    df.addErrback(self.makeMsgFailed(node))
-                    self.outstanding = self.outstanding + 1
-                    self.queried[node.id] = 1
-            if self.outstanding >= self.config['CONCURRENT_REQS']:
-                break
-        assert self.outstanding >=0
-        if self.outstanding == 0:
-            ## all done, didn't find it!!
-            self.finished=1
-            reactor.callLater(0, self.callback, self.target, [])
-
-    ## get value
-    def goWithNodes(self, nodes, found=None):
-        self.results = {}
-        if found:
-            for n in found:
-                self.results[n] = 1
-        for node in nodes:
-            if node.id == self.caller.node.id:
-                continue
-            else:
-                self.found[node.id] = node
-            
-        self.schedule()
 
+    def generateResult(self):
+        """Results have all been returned, now send the empty list to end the action."""
+        return (self.target, [])
+        
 
 class StoreValue(ActionBase):
-    def __init__(self, caller, target, value, callback, config, store="storeValue"):
-        ActionBase.__init__(self, caller, target, callback, config)
+    """Store a value in a list of nodes."""
+
+    def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
+        """Initialize the action with the value to store.
+        
+        @type value: C{string}
+        @param value: the value to store in the nodes
+        """
+        ActionBase.__init__(self, caller, target, callback, config, action, num_results)
         self.value = value
-        self.stored = []
-        self.store = store
         
-    def storedValue(self, t, node):
-        self.outstanding -= 1
-        self.caller.insertNode(node)
-        if self.finished:
-            return
-        self.stored.append(t)
-        if len(self.stored) >= self.config['STORE_REDUNDANCY']:
-            self.finished=1
-            self.callback(self.target, self.value, self.stored)
-        else:
-            if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
-                self.schedule()
-        return t
-    
-    def storeFailed(self, t, node):
-        print ">>> store failed %s/%s" % (node.host, node.port)
-        self.caller.nodeFailed(node)
-        self.outstanding -= 1
-        if self.finished:
-            return t
-        self.schedule()
-        return t
-    
-    def schedule(self):
-        if self.finished:
-            return
-        num = self.config['CONCURRENT_REQS'] - self.outstanding
-        if num > self.config['STORE_REDUNDANCY']:
-            num = self.config['STORE_REDUNDANCY']
-        for i in range(num):
-            try:
-                node = self.nodes.pop()
-            except IndexError:
-                if self.outstanding == 0:
-                    self.finished = 1
-                    self.callback(self.target, self.value, self.stored)
-            else:
-                if not node.id == self.caller.node.id:
-                    self.outstanding += 1
-                    try:
-                        f = getattr(node, self.store)
-                    except AttributeError:
-                        print ">>> %s doesn't have a %s method!" % (node, self.store)
-                    else:
-                        df = f(self.target, self.value, self.caller.node.id)
-                        df.addCallback(self.storedValue, node=node)
-                        df.addErrback(self.storeFailed, node=node)
-                    
-    def goWithNodes(self, nodes):
-        self.nodes = nodes
-        self.nodes.sort(self.sort)
-        self.schedule()
+    def getNodesToProcess(self):
+        """Nodes are never added, always return the same sorted list."""
+        return self.sorted_nodes
 
+    def generateArgs(self, node):
+        """Args include the value to store and the node's token."""
+        if node.token:
+            return (self.target, self.value, node.token), 1
+        else:
+            raise ValueError, "Don't store at this node since we don't know it's token"
 
-class KeyExpirer:
-    def __init__(self, store, config):
-        self.store = store
-        self.config = config
-        self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
+    def processResponse(self, dict):
+        """Save the response, though it should be nothin but the ID."""
+        self.results[dict["id"]] = dict
     
-    def doExpire(self):
-        self.store.expireValues(self.config['KE_AGE'])
-        self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
-        
-    def shutdown(self):
-        try:
-            self.next_expire.cancel()
-        except:
-            pass
+    def generateResult(self):
+        """Return all the response IDs received."""
+        return (self.target, self.value, self.results.values())