]> git.mxchange.org Git - quix0rs-apt-p2p.git/blobdiff - apt_dht_Khashmir/actions.py
Break up the find_value into 2 parts (with get_value).
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
index 05349d4147efe1df81e5c1ccfe35c95c599008a4..c94c78d7a76cc93647fa036fdef004911c33615d 100644 (file)
@@ -41,9 +41,6 @@ class ActionBase:
     def goWithNodes(self, t):
         pass
     
-    
-
-FIND_NODE_TIMEOUT = 15
 
 class FindNode(ActionBase):
     """ find node action merits it's own class as it is a long running stateful process """
@@ -107,32 +104,78 @@ class FindNode(ActionBase):
         self.schedule()
     
 
-get_value_timeout = 15
-class GetValue(FindNode):
-    def __init__(self, caller, target, callback, config, find="findValue"):
-        FindNode.__init__(self, caller, target, callback, config)
-        self.findValue = find
-            
-    """ get value task """
+class FindValue(ActionBase):
     def handleGotNodes(self, dict):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
         self.caller.insertNode(n)
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateNumValues(dict.get('num', 0))
+        l = dict["nodes"]
         if self.finished or self.answered.has_key(dict["id"]):
             # a day late and a dollar short
             return
         self.outstanding = self.outstanding - 1
         self.answered[dict["id"]] = 1
-        # go through nodes
-        # if we have any closer than what we already got, query them
-        if dict.has_key('nodes'):
-            for compact_node in dict['nodes']:
-                node = uncompact(compact_node)
-                n = self.caller.Node(node)
-                if not self.found.has_key(n.id):
-                    self.found[n.id] = n
-        elif dict.has_key('values'):
+        for compact_node in l:
+            node = uncompact(compact_node)
+            n = self.caller.Node(node)
+            if not self.found.has_key(n.id):
+                self.found[n.id] = n
+        self.schedule()
+        
+    def schedule(self):
+        """
+            send messages to new peers, if necessary
+        """
+        if self.finished:
+            return
+        l = self.found.values()
+        l.sort(self.sort)
+        for node in l[:self.config['K']]:
+            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
+                df = node.findValue(self.target, self.caller.node.id)
+                df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
+                self.outstanding = self.outstanding + 1
+                self.queried[node.id] = 1
+            if self.outstanding >= self.config['CONCURRENT_REQS']:
+                break
+        assert self.outstanding >=0
+        if self.outstanding == 0:
+            ## all done!!
+            self.finished=1
+            l = [node for node in self.found.values() if node.num_values > 0]
+            reactor.callLater(0, self.callback, l)
+    
+    def goWithNodes(self, nodes):
+        """
+            this starts the process, our argument is a transaction with t.extras being our list of nodes
+            it's a transaction since we got called from the dispatcher
+        """
+        for node in nodes:
+            if node.id == self.caller.node.id:
+                continue
+            else:
+                self.found[node.id] = node
+        
+        self.schedule()
+
+
+class GetValue(ActionBase):
+    def __init__(self, caller, target, num, callback, config, action="getValue"):
+        ActionBase.__init__(self, caller, target, callback, config)
+        self.num_values = num
+        self.outstanding_gets = 0
+        self.action = action
+        
+    def gotValues(self, dict, node):
+        dict = dict['rsp']
+        self.outstanding -= 1
+        self.caller.insertNode(node)
+        if self.finished:
+            return
+        if dict.has_key('values'):
             def x(y, z=self.results):
                 if not z.has_key(y):
                     z[y] = 1
@@ -141,58 +184,55 @@ class GetValue(FindNode):
                     return None
             z = len(dict['values'])
             v = filter(None, map(x, dict['values']))
-            if(len(v)):
+            if len(v):
                 reactor.callLater(0, self.callback, self.target, v)
-        self.schedule()
-        
-    ## get value
+        if len(self.results) >= self.num_values:
+            self.finished=1
+            reactor.callLater(0, self.callback, self.target, [])
+        else:
+            if not len(self.results) + self.outstanding_gets >= self.num_values:
+                self.schedule()
+    
     def schedule(self):
         if self.finished:
             return
-        l = self.found.values()
-        l.sort(self.sort)
         
-        for node in l[:self.config['K']]:
-            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
-                #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
+        for node in self.nodes:
+            if node.id not in self.queried and node.id != self.caller.node.id and node.num_values > 0:
                 try:
-                    f = getattr(node, self.findValue)
+                    f = getattr(node, self.action)
                 except AttributeError:
-                    log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue))
+                    log.msg("%s doesn't have a %s method!" % (node, self.action))
                 else:
-                    df = f(self.target, self.caller.node.id)
-                    df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
-                    self.outstanding = self.outstanding + 1
+                    self.outstanding += 1
+                    self.outstanding_gets += node.num_values
+                    df = f(self.target, 0, self.caller.node.id)
+                    df.addCallbacks(self.gotValues, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
                     self.queried[node.id] = 1
-            if self.outstanding >= self.config['CONCURRENT_REQS']:
+            if len(self.results) + self.outstanding_gets >= self.num_values or \
+                self.outstanding >= self.config['CONCURRENT_REQS']:
                 break
         assert self.outstanding >=0
         if self.outstanding == 0:
-            ## all done, didn't find it!!
-            self.finished=1
+            self.finished = 1
             reactor.callLater(0, self.callback, self.target, [])
-
-    ## get value
-    def goWithNodes(self, nodes, found=None):
+                    
+    def goWithNodes(self, nodes, found = None):
         self.results = {}
         if found:
             for n in found:
                 self.results[n] = 1
-        for node in nodes:
-            if node.id == self.caller.node.id:
-                continue
-            else:
-                self.found[node.id] = node
-            
+        self.nodes = nodes
+        self.nodes.sort(self.sort)
         self.schedule()
 
 
 class StoreValue(ActionBase):
-    def __init__(self, caller, target, value, callback, config, store="storeValue"):
+    def __init__(self, caller, target, value, callback, config, action="storeValue"):
         ActionBase.__init__(self, caller, target, callback, config)
         self.value = value
         self.stored = []
-        self.store = store
+        self.action = action
         
     def storedValue(self, t, node):
         self.outstanding -= 1
@@ -225,9 +265,9 @@ class StoreValue(ActionBase):
                 if not node.id == self.caller.node.id:
                     self.outstanding += 1
                     try:
-                        f = getattr(node, self.store)
+                        f = getattr(node, self.action)
                     except AttributeError:
-                        log.msg("%s doesn't have a %s method!" % (node, self.store))
+                        log.msg("%s doesn't have a %s method!" % (node, self.action))
                     else:
                         df = f(self.target, self.value, node.token, self.caller.node.id)
                         df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))