]> git.mxchange.org Git - quix0rs-apt-p2p.git/commitdiff
Break up the find_value into 2 parts (with get_value).
authorCameron Dale <camrdale@gmail.com>
Sat, 23 Feb 2008 02:44:13 +0000 (18:44 -0800)
committerCameron Dale <camrdale@gmail.com>
Sat, 23 Feb 2008 02:44:13 +0000 (18:44 -0800)
To better support multiple values per key, the old find_value request is
broken up into find_value, which doesn't stop when it finds a node with
values but just records the number of values the node has, and
get_value, which queries the found nodes that have values to retrieve
them.

TODO
apt_dht_Khashmir/actions.py
apt_dht_Khashmir/db.py
apt_dht_Khashmir/khashmir.py
apt_dht_Khashmir/knode.py
apt_dht_Khashmir/node.py

diff --git a/TODO b/TODO
index ce33a7763381df0317bbc3f5e5ed78db22f6a579..70b47d26264011a27ee592d8e1ff8a4dcf2bd93a 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,3 +1,22 @@
+Check packet lengths before sending get_value responses.
+
+The length of the created UDP packet needs to be checked before sending
+to make sure it is not so long that it will get fragmented. This is only
+possible for the get_value RPC request.
+
+
+Clean up the khashmir actions.
+
+The khashmir actions are a mess, and some cleanup is necessary. A lot
+of the actions have most of their processing in common, so this code
+should be put in functions that all can call. Perhaps creating a
+base "RecurringAction" and "StaticAction" would be a good idea,
+as then find_node and find_value could use the first, while get_value
+and store_value could use the second. Perhaps ping and join actions
+should also be created for consistency, and maybe inherit from a
+"SingleNodeAction" base class.
+
+
 Packages.diff files need to be considered.
 
 The Packages.diff/Index files contain hashes of Packages.diff/rred.gz 
 Packages.diff files need to be considered.
 
 The Packages.diff/Index files contain hashes of Packages.diff/rred.gz 
@@ -76,20 +95,3 @@ These should be combined (multiplied) to provide a sort order for peers
 available to download from, which can then be used to assign new 
 downloads to peers. Pieces should be downloaded from the best peers 
 first (i.e. piece 0 from the absolute best peer).
 available to download from, which can then be used to assign new 
 downloads to peers. Pieces should be downloaded from the best peers 
 first (i.e. piece 0 from the absolute best peer).
-
-
-When looking up values, DHT should return nodes and values.
-
-When a key has multiple values in the DHT, returning a stored value may not
-be sufficient, as then no more nodes can be contacted to get more stored
-values. Instead, return both the stored values and the list of closest
-nodes so that the peer doing the lookup can decide when to stop looking
-(when it has received enough values).
-
-Instead of returning both, a new method could be added, "lookup_value".
-This method will be like "get_value", except that every node will always
-return a list of nodes, as well as the number of values it has for that
-key. Once a querying node has found enough values (or all of them), then
-it would send the "get_value" method to the nodes that have the most
-values. The "get_value" query could also have a new parameter "number",
-which is the maximum number of values to return.
index 05349d4147efe1df81e5c1ccfe35c95c599008a4..c94c78d7a76cc93647fa036fdef004911c33615d 100644 (file)
@@ -41,9 +41,6 @@ class ActionBase:
     def goWithNodes(self, t):
         pass
     
     def goWithNodes(self, t):
         pass
     
-    
-
-FIND_NODE_TIMEOUT = 15
 
 class FindNode(ActionBase):
     """ find node action merits it's own class as it is a long running stateful process """
 
 class FindNode(ActionBase):
     """ find node action merits it's own class as it is a long running stateful process """
@@ -107,32 +104,78 @@ class FindNode(ActionBase):
         self.schedule()
     
 
         self.schedule()
     
 
-get_value_timeout = 15
-class GetValue(FindNode):
-    def __init__(self, caller, target, callback, config, find="findValue"):
-        FindNode.__init__(self, caller, target, callback, config)
-        self.findValue = find
-            
-    """ get value task """
+class FindValue(ActionBase):
     def handleGotNodes(self, dict):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
         self.caller.insertNode(n)
     def handleGotNodes(self, dict):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
         self.caller.insertNode(n)
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateNumValues(dict.get('num', 0))
+        l = dict["nodes"]
         if self.finished or self.answered.has_key(dict["id"]):
             # a day late and a dollar short
             return
         self.outstanding = self.outstanding - 1
         self.answered[dict["id"]] = 1
         if self.finished or self.answered.has_key(dict["id"]):
             # a day late and a dollar short
             return
         self.outstanding = self.outstanding - 1
         self.answered[dict["id"]] = 1
-        # go through nodes
-        # if we have any closer than what we already got, query them
-        if dict.has_key('nodes'):
-            for compact_node in dict['nodes']:
-                node = uncompact(compact_node)
-                n = self.caller.Node(node)
-                if not self.found.has_key(n.id):
-                    self.found[n.id] = n
-        elif dict.has_key('values'):
+        for compact_node in l:
+            node = uncompact(compact_node)
+            n = self.caller.Node(node)
+            if not self.found.has_key(n.id):
+                self.found[n.id] = n
+        self.schedule()
+        
+    def schedule(self):
+        """
+            send messages to new peers, if necessary
+        """
+        if self.finished:
+            return
+        l = self.found.values()
+        l.sort(self.sort)
+        for node in l[:self.config['K']]:
+            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
+                df = node.findValue(self.target, self.caller.node.id)
+                df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
+                self.outstanding = self.outstanding + 1
+                self.queried[node.id] = 1
+            if self.outstanding >= self.config['CONCURRENT_REQS']:
+                break
+        assert self.outstanding >=0
+        if self.outstanding == 0:
+            ## all done!!
+            self.finished=1
+            l = [node for node in self.found.values() if node.num_values > 0]
+            reactor.callLater(0, self.callback, l)
+    
+    def goWithNodes(self, nodes):
+        """
+            this starts the process, our argument is a transaction with t.extras being our list of nodes
+            it's a transaction since we got called from the dispatcher
+        """
+        for node in nodes:
+            if node.id == self.caller.node.id:
+                continue
+            else:
+                self.found[node.id] = node
+        
+        self.schedule()
+
+
+class GetValue(ActionBase):
+    def __init__(self, caller, target, num, callback, config, action="getValue"):
+        ActionBase.__init__(self, caller, target, callback, config)
+        self.num_values = num
+        self.outstanding_gets = 0
+        self.action = action
+        
+    def gotValues(self, dict, node):
+        dict = dict['rsp']
+        self.outstanding -= 1
+        self.caller.insertNode(node)
+        if self.finished:
+            return
+        if dict.has_key('values'):
             def x(y, z=self.results):
                 if not z.has_key(y):
                     z[y] = 1
             def x(y, z=self.results):
                 if not z.has_key(y):
                     z[y] = 1
@@ -141,58 +184,55 @@ class GetValue(FindNode):
                     return None
             z = len(dict['values'])
             v = filter(None, map(x, dict['values']))
                     return None
             z = len(dict['values'])
             v = filter(None, map(x, dict['values']))
-            if(len(v)):
+            if len(v):
                 reactor.callLater(0, self.callback, self.target, v)
                 reactor.callLater(0, self.callback, self.target, v)
-        self.schedule()
-        
-    ## get value
+        if len(self.results) >= self.num_values:
+            self.finished=1
+            reactor.callLater(0, self.callback, self.target, [])
+        else:
+            if not len(self.results) + self.outstanding_gets >= self.num_values:
+                self.schedule()
+    
     def schedule(self):
         if self.finished:
             return
     def schedule(self):
         if self.finished:
             return
-        l = self.found.values()
-        l.sort(self.sort)
         
         
-        for node in l[:self.config['K']]:
-            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
-                #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
+        for node in self.nodes:
+            if node.id not in self.queried and node.id != self.caller.node.id and node.num_values > 0:
                 try:
                 try:
-                    f = getattr(node, self.findValue)
+                    f = getattr(node, self.action)
                 except AttributeError:
                 except AttributeError:
-                    log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue))
+                    log.msg("%s doesn't have a %s method!" % (node, self.action))
                 else:
                 else:
-                    df = f(self.target, self.caller.node.id)
-                    df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
-                    self.outstanding = self.outstanding + 1
+                    self.outstanding += 1
+                    self.outstanding_gets += node.num_values
+                    df = f(self.target, 0, self.caller.node.id)
+                    df.addCallbacks(self.gotValues, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
                     self.queried[node.id] = 1
                     self.queried[node.id] = 1
-            if self.outstanding >= self.config['CONCURRENT_REQS']:
+            if len(self.results) + self.outstanding_gets >= self.num_values or \
+                self.outstanding >= self.config['CONCURRENT_REQS']:
                 break
         assert self.outstanding >=0
         if self.outstanding == 0:
                 break
         assert self.outstanding >=0
         if self.outstanding == 0:
-            ## all done, didn't find it!!
-            self.finished=1
+            self.finished = 1
             reactor.callLater(0, self.callback, self.target, [])
             reactor.callLater(0, self.callback, self.target, [])
-
-    ## get value
-    def goWithNodes(self, nodes, found=None):
+                    
+    def goWithNodes(self, nodes, found = None):
         self.results = {}
         if found:
             for n in found:
                 self.results[n] = 1
         self.results = {}
         if found:
             for n in found:
                 self.results[n] = 1
-        for node in nodes:
-            if node.id == self.caller.node.id:
-                continue
-            else:
-                self.found[node.id] = node
-            
+        self.nodes = nodes
+        self.nodes.sort(self.sort)
         self.schedule()
 
 
 class StoreValue(ActionBase):
         self.schedule()
 
 
 class StoreValue(ActionBase):
-    def __init__(self, caller, target, value, callback, config, store="storeValue"):
+    def __init__(self, caller, target, value, callback, config, action="storeValue"):
         ActionBase.__init__(self, caller, target, callback, config)
         self.value = value
         self.stored = []
         ActionBase.__init__(self, caller, target, callback, config)
         self.value = value
         self.stored = []
-        self.store = store
+        self.action = action
         
     def storedValue(self, t, node):
         self.outstanding -= 1
         
     def storedValue(self, t, node):
         self.outstanding -= 1
@@ -225,9 +265,9 @@ class StoreValue(ActionBase):
                 if not node.id == self.caller.node.id:
                     self.outstanding += 1
                     try:
                 if not node.id == self.caller.node.id:
                     self.outstanding += 1
                     try:
-                        f = getattr(node, self.store)
+                        f = getattr(node, self.action)
                     except AttributeError:
                     except AttributeError:
-                        log.msg("%s doesn't have a %s method!" % (node, self.store))
+                        log.msg("%s doesn't have a %s method!" % (node, self.action))
                     else:
                         df = f(self.target, self.value, node.token, self.caller.node.id)
                         df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
                     else:
                         df = f(self.target, self.value, node.token, self.caller.node.id)
                         df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
index f1a63a8100edfc3887745f6774e2ff76b5c60af6..7d40176a4ffb012fe0b64ec5ae663bda6350c3e7 100644 (file)
@@ -101,6 +101,16 @@ class DB:
             l.append(row[0])
         return l
 
             l.append(row[0])
         return l
 
+    def countValues(self, key):
+        """Count the number of values in the database."""
+        c = self.conn.cursor()
+        c.execute("SELECT COUNT(value) as num_values FROM kv WHERE key = ?", (khash(key),))
+        res = 0
+        row = c.fetchone()
+        if row:
+            res = row[0]
+        return res
+
     def storeValue(self, key, value):
         """Store or update a key and value."""
         c = self.conn.cursor()
     def storeValue(self, key, value):
         """Store or update a key and value."""
         c = self.conn.cursor()
index a9a78674a048c8e3d6e0f0e3392bcb285bd7bfce..6162a6e5e8ac88fe97bfa3ce9fdebe1f6d4b866a 100644 (file)
@@ -5,7 +5,7 @@ import warnings
 warnings.simplefilter("ignore", DeprecationWarning)
 
 from datetime import datetime, timedelta
 warnings.simplefilter("ignore", DeprecationWarning)
 
 from datetime import datetime, timedelta
-from random import randrange
+from random import randrange, shuffle
 from sha import sha
 import os
 
 from sha import sha
 import os
 
@@ -17,7 +17,7 @@ from db import DB
 from ktable import KTable
 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
 from khash import newID, newIDInRange
 from ktable import KTable
 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
 from khash import newID, newIDInRange
-from actions import FindNode, GetValue, StoreValue
+from actions import FindNode, FindValue, GetValue, StoreValue
 import krpc
 
 # this is the base class, has base functionality and find node, no key-value mappings
 import krpc
 
 # this is the base class, has base functionality and find node, no key-value mappings
@@ -224,13 +224,25 @@ class KhashmirRead(KhashmirBase):
     _Node = KNodeRead
 
     ## also async
     _Node = KNodeRead
 
     ## also async
+    def findValue(self, key, callback, errback=None):
+        """ returns the contact info for nodes that have values for the key, from the global table """
+        # get K nodes out of local table/cache
+        nodes = self.table.findNodes(key)
+        d = Deferred()
+        if errback:
+            d.addCallbacks(callback, errback)
+        else:
+            d.addCallback(callback)
+
+        # create our search state
+        state = FindValue(self, key, d.callback, self.config)
+        reactor.callLater(0, state.goWithNodes, nodes)
+
     def valueForKey(self, key, callback, searchlocal = 1):
         """ returns the values found for key in global table
             callback will be called with a list of values for each peer that returns unique values
             final callback will be an empty list - probably should change to 'more coming' arg
         """
     def valueForKey(self, key, callback, searchlocal = 1):
         """ returns the values found for key in global table
             callback will be called with a list of values for each peer that returns unique values
             final callback will be an empty list - probably should change to 'more coming' arg
         """
-        nodes = self.table.findNodes(key)
-        
         # get locals
         if searchlocal:
             l = self.store.retrieveValues(key)
         # get locals
         if searchlocal:
             l = self.store.retrieveValues(key)
@@ -238,23 +250,35 @@ class KhashmirRead(KhashmirBase):
                 reactor.callLater(0, callback, key, l)
         else:
             l = []
                 reactor.callLater(0, callback, key, l)
         else:
             l = []
-        
-        # create our search state
-        state = GetValue(self, key, callback, self.config)
-        reactor.callLater(0, state.goWithNodes, nodes, l)
+
+        def _getValueForKey(nodes, key=key, local_values=l, response=callback, table=self.table, config=self.config):
+            # create our search state
+            state = GetValue(table, key, 50 - len(local_values), response, config)
+            reactor.callLater(0, state.goWithNodes, nodes, local_values)
+            
+        # this call is asynch
+        self.findValue(key, _getValueForKey)
 
     #### Remote Interface - called by remote nodes
     def krpc_find_value(self, key, id, _krpc_sender):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
     
 
     #### Remote Interface - called by remote nodes
     def krpc_find_value(self, key, id, _krpc_sender):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
     
+        nodes = self.table.findNodes(key)
+        nodes = map(lambda node: node.contactInfo(), nodes)
+        num_values = self.store.countValues(key)
+        return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
+
+    def krpc_get_value(self, key, num, id, _krpc_sender):
+        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+        self.insertNode(n, contacted=0)
+    
         l = self.store.retrieveValues(key)
         l = self.store.retrieveValues(key)
-        if len(l) > 0:
+        if num == 0 or num >= len(l):
             return {'values' : l, "id": self.node.id}
         else:
             return {'values' : l, "id": self.node.id}
         else:
-            nodes = self.table.findNodes(key)
-            nodes = map(lambda node: node.contactInfo(), nodes)
-            return {'nodes' : nodes, "id": self.node.id}
+            shuffle(l)
+            return {'values' : l[:num], "id": self.node.id}
 
 ###  provides a generic write method, you probably don't want to deploy something that allows
 ###  arbitrary value storage
 
 ###  provides a generic write method, you probably don't want to deploy something that allows
 ###  arbitrary value storage
@@ -266,13 +290,13 @@ class KhashmirWrite(KhashmirRead):
             in this implementation, peers respond but don't indicate status to storing values
             a key can have many values
         """
             in this implementation, peers respond but don't indicate status to storing values
             a key can have many values
         """
-        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+        def _storeValueForKey(nodes, key=key, value=value, response=callback, table=self.table, config=self.config):
             if not response:
                 # default callback
                 def _storedValueHandler(key, value, sender):
                     pass
                 response=_storedValueHandler
             if not response:
                 # default callback
                 def _storedValueHandler(key, value, sender):
                     pass
                 response=_storedValueHandler
-            action = StoreValue(self.table, key, value, response, self.config)
+            action = StoreValue(table, key, value, response, config)
             reactor.callLater(0, action.goWithNodes, nodes)
             
         # this call is asynch
             reactor.callLater(0, action.goWithNodes, nodes)
             
         # this call is asynch
index 1ced1a0bb8250e3b154089f7208b845f8a35514c..795b8f0390c1989b8a2407e268ed606b984a587b 100644 (file)
@@ -48,6 +48,12 @@ class KNodeRead(KNodeBase):
         df.addCallback(self.checkSender)
         return df
 
         df.addCallback(self.checkSender)
         return df
 
+    def getValue(self, key, num, id):
+        df = self.conn.sendRequest('get_value', {"key" : key, "num": num, "id" : id})
+        df.addErrback(self.errBack)
+        df.addCallback(self.checkSender)
+        return df
+
 class KNodeWrite(KNodeRead):
     def storeValue(self, key, value, token, id):
         df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "token" : token, "id": id})
 class KNodeWrite(KNodeRead):
     def storeValue(self, key, value, token, id):
         df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "token" : token, "id": id})
index 3f04ef2724b8466e14fed017e8618e02878aa29a..6b3c433c607995008744494f9e69462a2c2bd414 100644 (file)
@@ -31,6 +31,7 @@ class Node:
         self.host = host
         self.port = int(port)
         self.token = ''
         self.host = host
         self.port = int(port)
         self.token = ''
+        self.num_values = 0
         self._contactInfo = None
     
     def updateLastSeen(self):
         self._contactInfo = None
     
     def updateLastSeen(self):
@@ -40,6 +41,9 @@ class Node:
     def updateToken(self, token):
         self.token = token
     
     def updateToken(self, token):
         self.token = token
     
+    def updateNumValues(self, num_values):
+        self.num_values = num_values
+    
     def msgFailed(self):
         self.fails = self.fails + 1
         return self.fails
     def msgFailed(self):
         self.fails = self.fails + 1
         return self.fails