From: Cameron Dale Date: Sat, 23 Feb 2008 02:44:13 +0000 (-0800) Subject: Break up the find_value into 2 parts (with get_value). X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=commitdiff_plain;h=92bb708a6485b52682fafc1aedc302e9a48cdaec Break up the find_value into 2 parts (with get_value). To better support multiple values per key, the old find_value request is broken up into find_value, which doesn't stop when it finds a node with values but just records the number of values the node has, and get_value, which queries the found nodes that have values to retrieve them. --- diff --git a/TODO b/TODO index ce33a77..70b47d2 100644 --- a/TODO +++ b/TODO @@ -1,3 +1,22 @@ +Check packet lengths before sending get_value responses. + +The length of the created UDP packet needs to be checked before sending +to make sure it is not so long that it will get fragmented. This is only +possible for the get_value RPC request. + + +Clean up the khashmir actions. + +The khashmir actions are a mess, and some cleanup is necessary. A lot +of the actions have most of their processing in common, so this code +should be put in functions that all can call. Perhaps creating a +base "RecurringAction" and "StaticAction" would be a good idea, +as then find_node and find_value could use the first, while get_value +and store_value could use the second. Perhaps ping and join actions +should also be created for consistency, and maybe inherit from a +"SingleNodeAction" base class. + + Packages.diff files need to be considered. The Packages.diff/Index files contain hashes of Packages.diff/rred.gz @@ -76,20 +95,3 @@ These should be combined (multiplied) to provide a sort order for peers available to download from, which can then be used to assign new downloads to peers. Pieces should be downloaded from the best peers first (i.e. piece 0 from the absolute best peer). - - -When looking up values, DHT should return nodes and values. - -When a key has multiple values in the DHT, returning a stored value may not -be sufficient, as then no more nodes can be contacted to get more stored -values. Instead, return both the stored values and the list of closest -nodes so that the peer doing the lookup can decide when to stop looking -(when it has received enough values). - -Instead of returning both, a new method could be added, "lookup_value". -This method will be like "get_value", except that every node will always -return a list of nodes, as well as the number of values it has for that -key. Once a querying node has found enough values (or all of them), then -it would send the "get_value" method to the nodes that have the most -values. The "get_value" query could also have a new parameter "number", -which is the maximum number of values to return. diff --git a/apt_dht_Khashmir/actions.py b/apt_dht_Khashmir/actions.py index 05349d4..c94c78d 100644 --- a/apt_dht_Khashmir/actions.py +++ b/apt_dht_Khashmir/actions.py @@ -41,9 +41,6 @@ class ActionBase: def goWithNodes(self, t): pass - - -FIND_NODE_TIMEOUT = 15 class FindNode(ActionBase): """ find node action merits it's own class as it is a long running stateful process """ @@ -107,32 +104,78 @@ class FindNode(ActionBase): self.schedule() -get_value_timeout = 15 -class GetValue(FindNode): - def __init__(self, caller, target, callback, config, find="findValue"): - FindNode.__init__(self, caller, target, callback, config) - self.findValue = find - - """ get value task """ +class FindValue(ActionBase): def handleGotNodes(self, dict): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1]) self.caller.insertNode(n) + if dict["id"] in self.found: + self.found[dict["id"]].updateNumValues(dict.get('num', 0)) + l = dict["nodes"] if self.finished or self.answered.has_key(dict["id"]): # a day late and a dollar short return self.outstanding = self.outstanding - 1 self.answered[dict["id"]] = 1 - # go through nodes - # if we have any closer than what we already got, query them - if dict.has_key('nodes'): - for compact_node in dict['nodes']: - node = uncompact(compact_node) - n = self.caller.Node(node) - if not self.found.has_key(n.id): - self.found[n.id] = n - elif dict.has_key('values'): + for compact_node in l: + node = uncompact(compact_node) + n = self.caller.Node(node) + if not self.found.has_key(n.id): + self.found[n.id] = n + self.schedule() + + def schedule(self): + """ + send messages to new peers, if necessary + """ + if self.finished: + return + l = self.found.values() + l.sort(self.sort) + for node in l[:self.config['K']]: + if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: + df = node.findValue(self.target, self.caller.node.id) + df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, )) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 + if self.outstanding >= self.config['CONCURRENT_REQS']: + break + assert self.outstanding >=0 + if self.outstanding == 0: + ## all done!! + self.finished=1 + l = [node for node in self.found.values() if node.num_values > 0] + reactor.callLater(0, self.callback, l) + + def goWithNodes(self, nodes): + """ + this starts the process, our argument is a transaction with t.extras being our list of nodes + it's a transaction since we got called from the dispatcher + """ + for node in nodes: + if node.id == self.caller.node.id: + continue + else: + self.found[node.id] = node + + self.schedule() + + +class GetValue(ActionBase): + def __init__(self, caller, target, num, callback, config, action="getValue"): + ActionBase.__init__(self, caller, target, callback, config) + self.num_values = num + self.outstanding_gets = 0 + self.action = action + + def gotValues(self, dict, node): + dict = dict['rsp'] + self.outstanding -= 1 + self.caller.insertNode(node) + if self.finished: + return + if dict.has_key('values'): def x(y, z=self.results): if not z.has_key(y): z[y] = 1 @@ -141,58 +184,55 @@ class GetValue(FindNode): return None z = len(dict['values']) v = filter(None, map(x, dict['values'])) - if(len(v)): + if len(v): reactor.callLater(0, self.callback, self.target, v) - self.schedule() - - ## get value + if len(self.results) >= self.num_values: + self.finished=1 + reactor.callLater(0, self.callback, self.target, []) + else: + if not len(self.results) + self.outstanding_gets >= self.num_values: + self.schedule() + def schedule(self): if self.finished: return - l = self.found.values() - l.sort(self.sort) - for node in l[:self.config['K']]: - if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id: - #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT + for node in self.nodes: + if node.id not in self.queried and node.id != self.caller.node.id and node.num_values > 0: try: - f = getattr(node, self.findValue) + f = getattr(node, self.action) except AttributeError: - log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue)) + log.msg("%s doesn't have a %s method!" % (node, self.action)) else: - df = f(self.target, self.caller.node.id) - df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, )) - self.outstanding = self.outstanding + 1 + self.outstanding += 1 + self.outstanding_gets += node.num_values + df = f(self.target, 0, self.caller.node.id) + df.addCallbacks(self.gotValues, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, )) self.queried[node.id] = 1 - if self.outstanding >= self.config['CONCURRENT_REQS']: + if len(self.results) + self.outstanding_gets >= self.num_values or \ + self.outstanding >= self.config['CONCURRENT_REQS']: break assert self.outstanding >=0 if self.outstanding == 0: - ## all done, didn't find it!! - self.finished=1 + self.finished = 1 reactor.callLater(0, self.callback, self.target, []) - - ## get value - def goWithNodes(self, nodes, found=None): + + def goWithNodes(self, nodes, found = None): self.results = {} if found: for n in found: self.results[n] = 1 - for node in nodes: - if node.id == self.caller.node.id: - continue - else: - self.found[node.id] = node - + self.nodes = nodes + self.nodes.sort(self.sort) self.schedule() class StoreValue(ActionBase): - def __init__(self, caller, target, value, callback, config, store="storeValue"): + def __init__(self, caller, target, value, callback, config, action="storeValue"): ActionBase.__init__(self, caller, target, callback, config) self.value = value self.stored = [] - self.store = store + self.action = action def storedValue(self, t, node): self.outstanding -= 1 @@ -225,9 +265,9 @@ class StoreValue(ActionBase): if not node.id == self.caller.node.id: self.outstanding += 1 try: - f = getattr(node, self.store) + f = getattr(node, self.action) except AttributeError: - log.msg("%s doesn't have a %s method!" % (node, self.store)) + log.msg("%s doesn't have a %s method!" % (node, self.action)) else: df = f(self.target, self.value, node.token, self.caller.node.id) df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, )) diff --git a/apt_dht_Khashmir/db.py b/apt_dht_Khashmir/db.py index f1a63a8..7d40176 100644 --- a/apt_dht_Khashmir/db.py +++ b/apt_dht_Khashmir/db.py @@ -101,6 +101,16 @@ class DB: l.append(row[0]) return l + def countValues(self, key): + """Count the number of values in the database.""" + c = self.conn.cursor() + c.execute("SELECT COUNT(value) as num_values FROM kv WHERE key = ?", (khash(key),)) + res = 0 + row = c.fetchone() + if row: + res = row[0] + return res + def storeValue(self, key, value): """Store or update a key and value.""" c = self.conn.cursor() diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index a9a7867..6162a6e 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -5,7 +5,7 @@ import warnings warnings.simplefilter("ignore", DeprecationWarning) from datetime import datetime, timedelta -from random import randrange +from random import randrange, shuffle from sha import sha import os @@ -17,7 +17,7 @@ from db import DB from ktable import KTable from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID from khash import newID, newIDInRange -from actions import FindNode, GetValue, StoreValue +from actions import FindNode, FindValue, GetValue, StoreValue import krpc # this is the base class, has base functionality and find node, no key-value mappings @@ -224,13 +224,25 @@ class KhashmirRead(KhashmirBase): _Node = KNodeRead ## also async + def findValue(self, key, callback, errback=None): + """ returns the contact info for nodes that have values for the key, from the global table """ + # get K nodes out of local table/cache + nodes = self.table.findNodes(key) + d = Deferred() + if errback: + d.addCallbacks(callback, errback) + else: + d.addCallback(callback) + + # create our search state + state = FindValue(self, key, d.callback, self.config) + reactor.callLater(0, state.goWithNodes, nodes) + def valueForKey(self, key, callback, searchlocal = 1): """ returns the values found for key in global table callback will be called with a list of values for each peer that returns unique values final callback will be an empty list - probably should change to 'more coming' arg """ - nodes = self.table.findNodes(key) - # get locals if searchlocal: l = self.store.retrieveValues(key) @@ -238,23 +250,35 @@ class KhashmirRead(KhashmirBase): reactor.callLater(0, callback, key, l) else: l = [] - - # create our search state - state = GetValue(self, key, callback, self.config) - reactor.callLater(0, state.goWithNodes, nodes, l) + + def _getValueForKey(nodes, key=key, local_values=l, response=callback, table=self.table, config=self.config): + # create our search state + state = GetValue(table, key, 50 - len(local_values), response, config) + reactor.callLater(0, state.goWithNodes, nodes, local_values) + + # this call is asynch + self.findValue(key, _getValueForKey) #### Remote Interface - called by remote nodes def krpc_find_value(self, key, id, _krpc_sender): n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) self.insertNode(n, contacted=0) + nodes = self.table.findNodes(key) + nodes = map(lambda node: node.contactInfo(), nodes) + num_values = self.store.countValues(key) + return {'nodes' : nodes, 'num' : num_values, "id": self.node.id} + + def krpc_get_value(self, key, num, id, _krpc_sender): + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted=0) + l = self.store.retrieveValues(key) - if len(l) > 0: + if num == 0 or num >= len(l): return {'values' : l, "id": self.node.id} else: - nodes = self.table.findNodes(key) - nodes = map(lambda node: node.contactInfo(), nodes) - return {'nodes' : nodes, "id": self.node.id} + shuffle(l) + return {'values' : l[:num], "id": self.node.id} ### provides a generic write method, you probably don't want to deploy something that allows ### arbitrary value storage @@ -266,13 +290,13 @@ class KhashmirWrite(KhashmirRead): in this implementation, peers respond but don't indicate status to storing values a key can have many values """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + def _storeValueForKey(nodes, key=key, value=value, response=callback, table=self.table, config=self.config): if not response: # default callback def _storedValueHandler(key, value, sender): pass response=_storedValueHandler - action = StoreValue(self.table, key, value, response, self.config) + action = StoreValue(table, key, value, response, config) reactor.callLater(0, action.goWithNodes, nodes) # this call is asynch diff --git a/apt_dht_Khashmir/knode.py b/apt_dht_Khashmir/knode.py index 1ced1a0..795b8f0 100644 --- a/apt_dht_Khashmir/knode.py +++ b/apt_dht_Khashmir/knode.py @@ -48,6 +48,12 @@ class KNodeRead(KNodeBase): df.addCallback(self.checkSender) return df + def getValue(self, key, num, id): + df = self.conn.sendRequest('get_value', {"key" : key, "num": num, "id" : id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df + class KNodeWrite(KNodeRead): def storeValue(self, key, value, token, id): df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "token" : token, "id": id}) diff --git a/apt_dht_Khashmir/node.py b/apt_dht_Khashmir/node.py index 3f04ef2..6b3c433 100644 --- a/apt_dht_Khashmir/node.py +++ b/apt_dht_Khashmir/node.py @@ -31,6 +31,7 @@ class Node: self.host = host self.port = int(port) self.token = '' + self.num_values = 0 self._contactInfo = None def updateLastSeen(self): @@ -40,6 +41,9 @@ class Node: def updateToken(self, token): self.token = token + def updateNumValues(self, num_values): + self.num_values = num_values + def msgFailed(self): self.fails = self.fails + 1 return self.fails