Break up the find_value into 2 parts (with get_value).
authorCameron Dale <camrdale@gmail.com>
Sat, 23 Feb 2008 02:44:13 +0000 (18:44 -0800)
committerCameron Dale <camrdale@gmail.com>
Sat, 23 Feb 2008 02:44:13 +0000 (18:44 -0800)
To better support multiple values per key, the old find_value request is
broken up into find_value, which doesn't stop when it finds a node with
values but just records the number of values the node has, and
get_value, which queries the found nodes that have values to retrieve
them.

TODO
apt_dht_Khashmir/actions.py
apt_dht_Khashmir/db.py
apt_dht_Khashmir/khashmir.py
apt_dht_Khashmir/knode.py
apt_dht_Khashmir/node.py

diff --git a/TODO b/TODO
index ce33a7763381df0317bbc3f5e5ed78db22f6a579..70b47d26264011a27ee592d8e1ff8a4dcf2bd93a 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,3 +1,22 @@
+Check packet lengths before sending get_value responses.
+
+The length of the created UDP packet needs to be checked before sending
+to make sure it is not so long that it will get fragmented. This is only
+possible for the get_value RPC request.
+
+
+Clean up the khashmir actions.
+
+The khashmir actions are a mess, and some cleanup is necessary. A lot
+of the actions have most of their processing in common, so this code
+should be put in functions that all can call. Perhaps creating a
+base "RecurringAction" and "StaticAction" would be a good idea,
+as then find_node and find_value could use the first, while get_value
+and store_value could use the second. Perhaps ping and join actions
+should also be created for consistency, and maybe inherit from a
+"SingleNodeAction" base class.
+
+
 Packages.diff files need to be considered.
 
 The Packages.diff/Index files contain hashes of Packages.diff/rred.gz 
@@ -76,20 +95,3 @@ These should be combined (multiplied) to provide a sort order for peers
 available to download from, which can then be used to assign new 
 downloads to peers. Pieces should be downloaded from the best peers 
 first (i.e. piece 0 from the absolute best peer).
-
-
-When looking up values, DHT should return nodes and values.
-
-When a key has multiple values in the DHT, returning a stored value may not
-be sufficient, as then no more nodes can be contacted to get more stored
-values. Instead, return both the stored values and the list of closest
-nodes so that the peer doing the lookup can decide when to stop looking
-(when it has received enough values).
-
-Instead of returning both, a new method could be added, "lookup_value".
-This method will be like "get_value", except that every node will always
-return a list of nodes, as well as the number of values it has for that
-key. Once a querying node has found enough values (or all of them), then
-it would send the "get_value" method to the nodes that have the most
-values. The "get_value" query could also have a new parameter "number",
-which is the maximum number of values to return.
index 05349d4147efe1df81e5c1ccfe35c95c599008a4..c94c78d7a76cc93647fa036fdef004911c33615d 100644 (file)
@@ -41,9 +41,6 @@ class ActionBase:
     def goWithNodes(self, t):
         pass
     
-    
-
-FIND_NODE_TIMEOUT = 15
 
 class FindNode(ActionBase):
     """ find node action merits it's own class as it is a long running stateful process """
@@ -107,32 +104,78 @@ class FindNode(ActionBase):
         self.schedule()
     
 
-get_value_timeout = 15
-class GetValue(FindNode):
-    def __init__(self, caller, target, callback, config, find="findValue"):
-        FindNode.__init__(self, caller, target, callback, config)
-        self.findValue = find
-            
-    """ get value task """
+class FindValue(ActionBase):
     def handleGotNodes(self, dict):
         _krpc_sender = dict['_krpc_sender']
         dict = dict['rsp']
         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
         self.caller.insertNode(n)
+        if dict["id"] in self.found:
+            self.found[dict["id"]].updateNumValues(dict.get('num', 0))
+        l = dict["nodes"]
         if self.finished or self.answered.has_key(dict["id"]):
             # a day late and a dollar short
             return
         self.outstanding = self.outstanding - 1
         self.answered[dict["id"]] = 1
-        # go through nodes
-        # if we have any closer than what we already got, query them
-        if dict.has_key('nodes'):
-            for compact_node in dict['nodes']:
-                node = uncompact(compact_node)
-                n = self.caller.Node(node)
-                if not self.found.has_key(n.id):
-                    self.found[n.id] = n
-        elif dict.has_key('values'):
+        for compact_node in l:
+            node = uncompact(compact_node)
+            n = self.caller.Node(node)
+            if not self.found.has_key(n.id):
+                self.found[n.id] = n
+        self.schedule()
+        
+    def schedule(self):
+        """
+            send messages to new peers, if necessary
+        """
+        if self.finished:
+            return
+        l = self.found.values()
+        l.sort(self.sort)
+        for node in l[:self.config['K']]:
+            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
+                df = node.findValue(self.target, self.caller.node.id)
+                df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
+                self.outstanding = self.outstanding + 1
+                self.queried[node.id] = 1
+            if self.outstanding >= self.config['CONCURRENT_REQS']:
+                break
+        assert self.outstanding >=0
+        if self.outstanding == 0:
+            ## all done!!
+            self.finished=1
+            l = [node for node in self.found.values() if node.num_values > 0]
+            reactor.callLater(0, self.callback, l)
+    
+    def goWithNodes(self, nodes):
+        """
+            this starts the process, our argument is a transaction with t.extras being our list of nodes
+            it's a transaction since we got called from the dispatcher
+        """
+        for node in nodes:
+            if node.id == self.caller.node.id:
+                continue
+            else:
+                self.found[node.id] = node
+        
+        self.schedule()
+
+
+class GetValue(ActionBase):
+    def __init__(self, caller, target, num, callback, config, action="getValue"):
+        ActionBase.__init__(self, caller, target, callback, config)
+        self.num_values = num
+        self.outstanding_gets = 0
+        self.action = action
+        
+    def gotValues(self, dict, node):
+        dict = dict['rsp']
+        self.outstanding -= 1
+        self.caller.insertNode(node)
+        if self.finished:
+            return
+        if dict.has_key('values'):
             def x(y, z=self.results):
                 if not z.has_key(y):
                     z[y] = 1
@@ -141,58 +184,55 @@ class GetValue(FindNode):
                     return None
             z = len(dict['values'])
             v = filter(None, map(x, dict['values']))
-            if(len(v)):
+            if len(v):
                 reactor.callLater(0, self.callback, self.target, v)
-        self.schedule()
-        
-    ## get value
+        if len(self.results) >= self.num_values:
+            self.finished=1
+            reactor.callLater(0, self.callback, self.target, [])
+        else:
+            if not len(self.results) + self.outstanding_gets >= self.num_values:
+                self.schedule()
+    
     def schedule(self):
         if self.finished:
             return
-        l = self.found.values()
-        l.sort(self.sort)
         
-        for node in l[:self.config['K']]:
-            if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
-                #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
+        for node in self.nodes:
+            if node.id not in self.queried and node.id != self.caller.node.id and node.num_values > 0:
                 try:
-                    f = getattr(node, self.findValue)
+                    f = getattr(node, self.action)
                 except AttributeError:
-                    log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue))
+                    log.msg("%s doesn't have a %s method!" % (node, self.action))
                 else:
-                    df = f(self.target, self.caller.node.id)
-                    df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
-                    self.outstanding = self.outstanding + 1
+                    self.outstanding += 1
+                    self.outstanding_gets += node.num_values
+                    df = f(self.target, 0, self.caller.node.id)
+                    df.addCallbacks(self.gotValues, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
                     self.queried[node.id] = 1
-            if self.outstanding >= self.config['CONCURRENT_REQS']:
+            if len(self.results) + self.outstanding_gets >= self.num_values or \
+                self.outstanding >= self.config['CONCURRENT_REQS']:
                 break
         assert self.outstanding >=0
         if self.outstanding == 0:
-            ## all done, didn't find it!!
-            self.finished=1
+            self.finished = 1
             reactor.callLater(0, self.callback, self.target, [])
-
-    ## get value
-    def goWithNodes(self, nodes, found=None):
+                    
+    def goWithNodes(self, nodes, found = None):
         self.results = {}
         if found:
             for n in found:
                 self.results[n] = 1
-        for node in nodes:
-            if node.id == self.caller.node.id:
-                continue
-            else:
-                self.found[node.id] = node
-            
+        self.nodes = nodes
+        self.nodes.sort(self.sort)
         self.schedule()
 
 
 class StoreValue(ActionBase):
-    def __init__(self, caller, target, value, callback, config, store="storeValue"):
+    def __init__(self, caller, target, value, callback, config, action="storeValue"):
         ActionBase.__init__(self, caller, target, callback, config)
         self.value = value
         self.stored = []
-        self.store = store
+        self.action = action
         
     def storedValue(self, t, node):
         self.outstanding -= 1
@@ -225,9 +265,9 @@ class StoreValue(ActionBase):
                 if not node.id == self.caller.node.id:
                     self.outstanding += 1
                     try:
-                        f = getattr(node, self.store)
+                        f = getattr(node, self.action)
                     except AttributeError:
-                        log.msg("%s doesn't have a %s method!" % (node, self.store))
+                        log.msg("%s doesn't have a %s method!" % (node, self.action))
                     else:
                         df = f(self.target, self.value, node.token, self.caller.node.id)
                         df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
index f1a63a8100edfc3887745f6774e2ff76b5c60af6..7d40176a4ffb012fe0b64ec5ae663bda6350c3e7 100644 (file)
@@ -101,6 +101,16 @@ class DB:
             l.append(row[0])
         return l
 
+    def countValues(self, key):
+        """Count the number of values in the database."""
+        c = self.conn.cursor()
+        c.execute("SELECT COUNT(value) as num_values FROM kv WHERE key = ?", (khash(key),))
+        res = 0
+        row = c.fetchone()
+        if row:
+            res = row[0]
+        return res
+
     def storeValue(self, key, value):
         """Store or update a key and value."""
         c = self.conn.cursor()
index a9a78674a048c8e3d6e0f0e3392bcb285bd7bfce..6162a6e5e8ac88fe97bfa3ce9fdebe1f6d4b866a 100644 (file)
@@ -5,7 +5,7 @@ import warnings
 warnings.simplefilter("ignore", DeprecationWarning)
 
 from datetime import datetime, timedelta
-from random import randrange
+from random import randrange, shuffle
 from sha import sha
 import os
 
@@ -17,7 +17,7 @@ from db import DB
 from ktable import KTable
 from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
 from khash import newID, newIDInRange
-from actions import FindNode, GetValue, StoreValue
+from actions import FindNode, FindValue, GetValue, StoreValue
 import krpc
 
 # this is the base class, has base functionality and find node, no key-value mappings
@@ -224,13 +224,25 @@ class KhashmirRead(KhashmirBase):
     _Node = KNodeRead
 
     ## also async
+    def findValue(self, key, callback, errback=None):
+        """ returns the contact info for nodes that have values for the key, from the global table """
+        # get K nodes out of local table/cache
+        nodes = self.table.findNodes(key)
+        d = Deferred()
+        if errback:
+            d.addCallbacks(callback, errback)
+        else:
+            d.addCallback(callback)
+
+        # create our search state
+        state = FindValue(self, key, d.callback, self.config)
+        reactor.callLater(0, state.goWithNodes, nodes)
+
     def valueForKey(self, key, callback, searchlocal = 1):
         """ returns the values found for key in global table
             callback will be called with a list of values for each peer that returns unique values
             final callback will be an empty list - probably should change to 'more coming' arg
         """
-        nodes = self.table.findNodes(key)
-        
         # get locals
         if searchlocal:
             l = self.store.retrieveValues(key)
@@ -238,23 +250,35 @@ class KhashmirRead(KhashmirBase):
                 reactor.callLater(0, callback, key, l)
         else:
             l = []
-        
-        # create our search state
-        state = GetValue(self, key, callback, self.config)
-        reactor.callLater(0, state.goWithNodes, nodes, l)
+
+        def _getValueForKey(nodes, key=key, local_values=l, response=callback, table=self.table, config=self.config):
+            # create our search state
+            state = GetValue(table, key, 50 - len(local_values), response, config)
+            reactor.callLater(0, state.goWithNodes, nodes, local_values)
+            
+        # this call is asynch
+        self.findValue(key, _getValueForKey)
 
     #### Remote Interface - called by remote nodes
     def krpc_find_value(self, key, id, _krpc_sender):
         n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
         self.insertNode(n, contacted=0)
     
+        nodes = self.table.findNodes(key)
+        nodes = map(lambda node: node.contactInfo(), nodes)
+        num_values = self.store.countValues(key)
+        return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
+
+    def krpc_get_value(self, key, num, id, _krpc_sender):
+        n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+        self.insertNode(n, contacted=0)
+    
         l = self.store.retrieveValues(key)
-        if len(l) > 0:
+        if num == 0 or num >= len(l):
             return {'values' : l, "id": self.node.id}
         else:
-            nodes = self.table.findNodes(key)
-            nodes = map(lambda node: node.contactInfo(), nodes)
-            return {'nodes' : nodes, "id": self.node.id}
+            shuffle(l)
+            return {'values' : l[:num], "id": self.node.id}
 
 ###  provides a generic write method, you probably don't want to deploy something that allows
 ###  arbitrary value storage
@@ -266,13 +290,13 @@ class KhashmirWrite(KhashmirRead):
             in this implementation, peers respond but don't indicate status to storing values
             a key can have many values
         """
-        def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+        def _storeValueForKey(nodes, key=key, value=value, response=callback, table=self.table, config=self.config):
             if not response:
                 # default callback
                 def _storedValueHandler(key, value, sender):
                     pass
                 response=_storedValueHandler
-            action = StoreValue(self.table, key, value, response, self.config)
+            action = StoreValue(table, key, value, response, config)
             reactor.callLater(0, action.goWithNodes, nodes)
             
         # this call is asynch
index 1ced1a0bb8250e3b154089f7208b845f8a35514c..795b8f0390c1989b8a2407e268ed606b984a587b 100644 (file)
@@ -48,6 +48,12 @@ class KNodeRead(KNodeBase):
         df.addCallback(self.checkSender)
         return df
 
+    def getValue(self, key, num, id):
+        df = self.conn.sendRequest('get_value', {"key" : key, "num": num, "id" : id})
+        df.addErrback(self.errBack)
+        df.addCallback(self.checkSender)
+        return df
+
 class KNodeWrite(KNodeRead):
     def storeValue(self, key, value, token, id):
         df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "token" : token, "id": id})
index 3f04ef2724b8466e14fed017e8618e02878aa29a..6b3c433c607995008744494f9e69462a2c2bd414 100644 (file)
@@ -31,6 +31,7 @@ class Node:
         self.host = host
         self.port = int(port)
         self.token = ''
+        self.num_values = 0
         self._contactInfo = None
     
     def updateLastSeen(self):
@@ -40,6 +41,9 @@ class Node:
     def updateToken(self, token):
         self.token = token
     
+    def updateNumValues(self, num_values):
+        self.num_values = num_values
+    
     def msgFailed(self):
         self.fails = self.fails + 1
         return self.fails