]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/actions.py
Rewrite of the actions to take advantage of the commonalities between them.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
index 8da4431dcd1d110e05c9cf429ea58fc4ab3bb435..c80591a4dcfbb87e1889734955be6c7ba33f7351 100644 (file)
@@ -2,25 +2,33 @@
 # see LICENSE.txt for license information
 
 from twisted.internet import reactor
+from twisted.python import log
 
 from khash import intify
+from util import uncompact
 
 class ActionBase:
     """ base class for some long running asynchronous proccesses like finding nodes or values """
-    def __init__(self, caller, target, callback, config):
+    def __init__(self, caller, target, callback, config, action, num_results = None):
+        """Initialize the action."""
         self.caller = caller
         self.target = target
         self.config = config
+        self.action = action
         self.num = intify(target)
-        self.found = {}
         self.queried = {}
         self.answered = {}
+        self.found = {}
+        self.sorted_nodes = []
+        self.results = {}
+        self.desired_results = num_results
         self.callback = callback
         self.outstanding = 0
+        self.outstanding_results = 0
         self.finished = 0
     
         def sort(a, b, num=self.num):
-            """ this function is for sorting nodes relative to the ID we are looking for """
+            """Sort nodes relative to the ID we are looking for."""
             x, y = num ^ a.num, num ^ b.num
             if x > y:
                 return 1
@@ -29,105 +37,189 @@ class ActionBase:
             return 0
         self.sort = sort
         
-    def goWithNodes(self, t):
-        pass
-    
+    def goWithNodes(self, nodes):
+        """Start the action's process with a list of nodes to contact."""
+        for node in nodes:
+            if node.id == self.caller.node.id:
+                continue
+            else:
+                self.found[node.id] = node
+        self.sortNodes()
+        self.schedule()
     
+    def schedule(self):
+        """Schedule requests to be sent to remote nodes."""
+        # Check if we are already done
+        if self.desired_results and len(self.results) >= self.desired_results:
+            self.finished=1
+            result = self.generateResult()
+            reactor.callLater(0, self.callback, *result)
 
-FIND_NODE_TIMEOUT = 15
-
-class FindNode(ActionBase):
-    """ find node action merits it's own class as it is a long running stateful process """
-    def handleGotNodes(self, dict):
-        _krpc_sender = dict['_krpc_sender']
-        dict = dict['rsp']
-        n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
-        self.caller.insertNode(n)
-        l = dict["nodes"]
-        if self.finished or self.answered.has_key(dict["id"]):
-            # a day late and a dollar short
+        if self.finished or (self.desired_results and 
+                             len(self.results) + self.outstanding_results >= self.desired_results):
             return
-        self.outstanding = self.outstanding - 1
-        self.answered[dict["id"]] = 1
-        for node in l:
-            n = self.caller.Node(node)
-            if not self.found.has_key(n.id):
-                self.found[n.id] = n
-        self.schedule()
         
-    def schedule(self):
-        """
-            send messages to new peers, if necessary
-        """
-        if self.finished:
-            return
-        l = self.found.values()
-        l.sort(self.sort)
-        for node in l[:self.config['K']]:
-            if node.id == self.target:
-                self.finished=1
-                return self.callback([node])
-            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
-                #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
-                df = node.findNode(self.target, self.caller.node.id)
-                df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
-                self.outstanding = self.outstanding + 1
+        for node in self.getNodesToProcess():
+            if node.id not in self.queried and node.id != self.caller.node.id:
                 self.queried[node.id] = 1
-            if self.outstanding >= self.config['CONCURRENT_REQS']:
+                
+                # Get the action to call on the node
+                try:
+                    f = getattr(node, self.action)
+                except AttributeError:
+                    log.msg("%s doesn't have a %s method!" % (node, self.action))
+                else:
+                    # Get the arguments to the action's method
+                    try:
+                        args, expected_results = self.generateArgs(node)
+                    except ValueError:
+                        pass
+                    else:
+                        # Call the action on the remote node
+                        self.outstanding += 1
+                        self.outstanding_results += expected_results
+                        df = f(self.caller.node.id, *args)
+                        df.addCallbacks(self.gotResponse, self.actionFailed,
+                                        callbackArgs = (node, expected_results),
+                                        errbackArgs = (node, expected_results))
+                        
+            # We might have to stop for now
+            if (self.outstanding >= self.config['CONCURRENT_REQS'] or
+                (self.desired_results and
+                 self.outstanding_results >= self.desired_results)):
                 break
+            
+        # If no requests are outstanding, then we are done
         assert self.outstanding >=0
         if self.outstanding == 0:
-            ## all done!!
-            self.finished=1
-            reactor.callLater(0, self.callback, l[:self.config['K']])
-    
-    def makeMsgFailed(self, node):
-        def defaultGotNodes(err, self=self, node=node):
-            print ">>> find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port), err
-            self.caller.table.nodeFailed(node)
-            self.outstanding = self.outstanding - 1
-            self.schedule()
-        return defaultGotNodes
+            self.finished = 1
+            result = self.generateResult()
+            reactor.callLater(0, self.callback, *result)
+
+    def gotResponse(self, dict, node, expected_results):
+        """Receive a response from a remote node."""
+        self.caller.insertNode(node)
+        if self.finished or self.answered.has_key(node.id):
+            # a day late and a dollar short
+            return
+        self.outstanding -= 1
+        self.outstanding_results -= expected_results
+        self.answered[node.id] = 1
+        self.processResponse(dict['rsp'])
+        self.schedule()
+
+    def actionFailed(self, err, node, expected_results):
+        """Receive an error from a remote node."""
+        log.msg("action %s failed (%s) %s/%s" % (self.action, self.config['PORT'], node.host, node.port))
+        log.err(err)
+        self.caller.table.nodeFailed(node)
+        self.answered[node.id] = 1
+        self.outstanding -= 1
+        self.outstanding_results -= expected_results
+        self.schedule()
     
-    def goWithNodes(self, nodes):
+    def handleGotNodes(self, nodes):
+        """Process any received node contact info in the response."""
+        for compact_node in nodes:
+            node_contact = uncompact(compact_node)
+            node = self.caller.Node(node_contact)
+            if not self.found.has_key(node.id):
+                self.found[node.id] = node
+
+    def sortNodes(self):
+        """Sort the nodes, if necessary.
+        
+        Assumes nodes are never removed from the L{found} dictionary.
         """
-            this starts the process, our argument is a transaction with t.extras being our list of nodes
-            it's a transaction since we got called from the dispatcher
+        if len(self.sorted_nodes) != len(self.found):
+            self.sorted_nodes = self.found.values()
+            self.sorted_nodes.sort(self.sort)
+                
+    # The methods below are meant to be subclassed by actions
+    def getNodesToProcess(self):
+        """Generate a list of nodes to process next.
+        
+        This implementation is suitable for a recurring search over all nodes.
         """
-        for node in nodes:
-            if node.id == self.caller.node.id:
-                continue
-            else:
-                self.found[node.id] = node
+        self.sortNodes()
+        return self.sorted_nodes[:self.config['K']]
+    
+    def generateArgs(self, node):
+        """Generate the arguments to the node's action.
         
-        self.schedule()
+        These arguments will be appended to our node ID when calling the action.
+        Also return the number of results expected from this request.
+        
+        @raise ValueError: if the node should not be queried
+        """
+        return (self.target, ), 0
     
+    def processResponse(self, dict):
+        """Process the response dictionary received from the remote node."""
+        pass
 
-get_value_timeout = 15
-class GetValue(FindNode):
-    def __init__(self, caller, target, callback, config, find="findValue"):
-        FindNode.__init__(self, caller, target, callback, config)
-        self.findValue = find
-            
-    """ get value task """
-    def handleGotNodes(self, dict):
-        _krpc_sender = dict['_krpc_sender']
-        dict = dict['rsp']
-        n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
-        self.caller.insertNode(n)
-        if self.finished or self.answered.has_key(dict["id"]):
-            # a day late and a dollar short
-            return
-        self.outstanding = self.outstanding - 1
-        self.answered[dict["id"]] = 1
-        # go through nodes
-        # if we have any closer than what we already got, query them
-        if dict.has_key('nodes'):
-            for node in dict['nodes']:
-                n = self.caller.Node(node)
-                if not self.found.has_key(n.id):
-                    self.found[n.id] = n
-        elif dict.has_key('values'):
+    def generateResult(self, nodes):
+        """Create the result to return to the callback function."""
+        return []
+        
+
+class FindNode(ActionBase):
+    """Find the closest nodes to the key."""
+
+    def __init__(self, caller, target, callback, config, action="findNode"):
+        ActionBase.__init__(self, caller, target, callback, config, action)
+
+    def processResponse(self, dict):
+        """Save the token received from each node."""
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateToken(dict.get('token', ''))
+        self.handleGotNodes(dict['nodes'])
+
+    def generateResult(self):
+        """Result is the K closest nodes to the target."""
+        self.sortNodes()
+        return (self.sorted_nodes[:self.config['K']], )
+    
+
+class FindValue(ActionBase):
+    """Find the closest nodes to the key and check their values."""
+
+    def __init__(self, caller, target, callback, config, action="findValue"):
+        ActionBase.__init__(self, caller, target, callback, config, action)
+
+    def processResponse(self, dict):
+        """Save the number of values each node has."""
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateNumValues(dict.get('num', 0))
+        self.handleGotNodes(dict['nodes'])
+        
+    def generateResult(self):
+        """Result is the nodes that have values, sorted by proximity to the key."""
+        self.sortNodes()
+        return ([node for node in self.sorted_nodes if node.num_values > 0], )
+    
+
+class GetValue(ActionBase):
+    def __init__(self, caller, target, local_results, num_results, callback, config, action="getValue"):
+        ActionBase.__init__(self, caller, target, callback, config, action, num_results)
+        if local_results:
+            for result in local_results:
+                self.results[result] = 1
+
+    def getNodesToProcess(self):
+        """Nodes are never added, always return the same thing."""
+        return self.sorted_nodes
+    
+    def generateArgs(self, node):
+        """Args include the number of values to request."""
+        if node.num_values > 0:
+            return (self.target, 0), node.num_values
+        else:
+            raise ValueError, "Don't try and get values from this node because it doesn't have any"
+
+    def processResponse(self, dict):
+        """Save the returned values, calling the callback each time there are new ones."""
+        if dict.has_key('values'):
             def x(y, z=self.results):
                 if not z.has_key(y):
                     z[y] = 1
@@ -136,127 +228,34 @@ class GetValue(FindNode):
                     return None
             z = len(dict['values'])
             v = filter(None, map(x, dict['values']))
-            if(len(v)):
+            if len(v):
                 reactor.callLater(0, self.callback, self.target, v)
-        self.schedule()
-        
-    ## get value
-    def schedule(self):
-        if self.finished:
-            return
-        l = self.found.values()
-        l.sort(self.sort)
-        
-        for node in l[:self.config['K']]:
-            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
-                #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
-                try:
-                    f = getattr(node, self.findValue)
-                except AttributeError:
-                    print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
-                else:
-                    df = f(self.target, self.caller.node.id)
-                    df.addCallback(self.handleGotNodes)
-                    df.addErrback(self.makeMsgFailed(node))
-                    self.outstanding = self.outstanding + 1
-                    self.queried[node.id] = 1
-            if self.outstanding >= self.config['CONCURRENT_REQS']:
-                break
-        assert self.outstanding >=0
-        if self.outstanding == 0:
-            ## all done, didn't find it!!
-            self.finished=1
-            reactor.callLater(0, self.callback, self.target, [])
-
-    ## get value
-    def goWithNodes(self, nodes, found=None):
-        self.results = {}
-        if found:
-            for n in found:
-                self.results[n] = 1
-        for node in nodes:
-            if node.id == self.caller.node.id:
-                continue
-            else:
-                self.found[node.id] = node
-            
-        self.schedule()
 
+    def generateResult(self):
+        """Results have all been returned, now send the empty list to end it."""
+        return (self.target, [])
+        
 
 class StoreValue(ActionBase):
-    def __init__(self, caller, target, value, originated, callback, config, store="storeValue"):
-        ActionBase.__init__(self, caller, target, callback, config)
+    def __init__(self, caller, target, value, num_results, callback, config, action="storeValue"):
+        ActionBase.__init__(self, caller, target, callback, config, action, num_results)
         self.value = value
-        self.originated = originated
-        self.stored = []
-        self.store = store
         
-    def storedValue(self, t, node):
-        self.outstanding -= 1
-        self.caller.insertNode(node)
-        if self.finished:
-            return
-        self.stored.append(t)
-        if len(self.stored) >= self.config['STORE_REDUNDANCY']:
-            self.finished=1
-            self.callback(self.target, self.value, self.stored)
-        else:
-            if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
-                self.schedule()
-        return t
-    
-    def storeFailed(self, t, node):
-        print ">>> store failed %s/%s" % (node.host, node.port)
-        self.caller.nodeFailed(node)
-        self.outstanding -= 1
-        if self.finished:
-            return t
-        self.schedule()
-        return t
-    
-    def schedule(self):
-        if self.finished:
-            return
-        num = self.config['CONCURRENT_REQS'] - self.outstanding
-        if num > self.config['STORE_REDUNDANCY']:
-            num = self.config['STORE_REDUNDANCY']
-        for i in range(num):
-            try:
-                node = self.nodes.pop()
-            except IndexError:
-                if self.outstanding == 0:
-                    self.finished = 1
-                    self.callback(self.target, self.value, self.stored)
-            else:
-                if not node.id == self.caller.node.id:
-                    self.outstanding += 1
-                    try:
-                        f = getattr(node, self.store)
-                    except AttributeError:
-                        print ">>> %s doesn't have a %s method!" % (node, self.store)
-                    else:
-                        df = f(self.target, self.value, self.originated, self.caller.node.id)
-                        df.addCallback(self.storedValue, node=node)
-                        df.addErrback(self.storeFailed, node=node)
-                    
-    def goWithNodes(self, nodes):
-        self.nodes = nodes
-        self.nodes.sort(self.sort)
-        self.schedule()
+    def getNodesToProcess(self):
+        """Nodes are never added, always return the same thing."""
+        return self.sorted_nodes
 
+    def generateArgs(self, node):
+        """Args include the value to request and the node's token."""
+        if node.token:
+            return (self.target, self.value, node.token), 1
+        else:
+            raise ValueError, "Don't store at this node since we don't know it's token"
 
-class KeyExpirer:
-    def __init__(self, store, config):
-        self.store = store
-        self.config = config
-        self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
+    def processResponse(self, dict):
+        """Save the response, though it should be nothin but the ID."""
+        self.results[dict["id"]] = dict
     
-    def doExpire(self):
-        self.store.expireValues(self.config['KE_AGE'])
-        self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
-        
-    def shutdown(self):
-        try:
-            self.next_expire.cancel()
-        except:
-            pass
+    def generateResult(self):
+        """Return all the response IDs received."""
+        return (self.target, self.value, self.results.values())