+Check packet lengths before sending get_value responses.
+
+The length of the created UDP packet needs to be checked before sending
+to make sure it is not so long that it will get fragmented. This is only
+possible for the get_value RPC request.
+
+
+Clean up the khashmir actions.
+
+The khashmir actions are a mess, and some cleanup is necessary. A lot
+of the actions have most of their processing in common, so this code
+should be put in functions that all can call. Perhaps creating a
+base "RecurringAction" and "StaticAction" would be a good idea,
+as then find_node and find_value could use the first, while get_value
+and store_value could use the second. Perhaps ping and join actions
+should also be created for consistency, and maybe inherit from a
+"SingleNodeAction" base class.
+
+
Packages.diff files need to be considered.
The Packages.diff/Index files contain hashes of Packages.diff/rred.gz
available to download from, which can then be used to assign new
downloads to peers. Pieces should be downloaded from the best peers
first (i.e. piece 0 from the absolute best peer).
-
-
-When looking up values, DHT should return nodes and values.
-
-When a key has multiple values in the DHT, returning a stored value may not
-be sufficient, as then no more nodes can be contacted to get more stored
-values. Instead, return both the stored values and the list of closest
-nodes so that the peer doing the lookup can decide when to stop looking
-(when it has received enough values).
-
-Instead of returning both, a new method could be added, "lookup_value".
-This method will be like "get_value", except that every node will always
-return a list of nodes, as well as the number of values it has for that
-key. Once a querying node has found enough values (or all of them), then
-it would send the "get_value" method to the nodes that have the most
-values. The "get_value" query could also have a new parameter "number",
-which is the maximum number of values to return.
def goWithNodes(self, t):
pass
-
-
-FIND_NODE_TIMEOUT = 15
class FindNode(ActionBase):
""" find node action merits it's own class as it is a long running stateful process """
self.schedule()
-get_value_timeout = 15
-class GetValue(FindNode):
- def __init__(self, caller, target, callback, config, find="findValue"):
- FindNode.__init__(self, caller, target, callback, config)
- self.findValue = find
-
- """ get value task """
+class FindValue(ActionBase):
def handleGotNodes(self, dict):
_krpc_sender = dict['_krpc_sender']
dict = dict['rsp']
n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
self.caller.insertNode(n)
+ if dict["id"] in self.found:
+ self.found[dict["id"]].updateNumValues(dict.get('num', 0))
+ l = dict["nodes"]
if self.finished or self.answered.has_key(dict["id"]):
# a day late and a dollar short
return
self.outstanding = self.outstanding - 1
self.answered[dict["id"]] = 1
- # go through nodes
- # if we have any closer than what we already got, query them
- if dict.has_key('nodes'):
- for compact_node in dict['nodes']:
- node = uncompact(compact_node)
- n = self.caller.Node(node)
- if not self.found.has_key(n.id):
- self.found[n.id] = n
- elif dict.has_key('values'):
+ for compact_node in l:
+ node = uncompact(compact_node)
+ n = self.caller.Node(node)
+ if not self.found.has_key(n.id):
+ self.found[n.id] = n
+ self.schedule()
+
+ def schedule(self):
+ """
+ send messages to new peers, if necessary
+ """
+ if self.finished:
+ return
+ l = self.found.values()
+ l.sort(self.sort)
+ for node in l[:self.config['K']]:
+ if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
+ df = node.findValue(self.target, self.caller.node.id)
+ df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
+ self.outstanding = self.outstanding + 1
+ self.queried[node.id] = 1
+ if self.outstanding >= self.config['CONCURRENT_REQS']:
+ break
+ assert self.outstanding >=0
+ if self.outstanding == 0:
+ ## all done!!
+ self.finished=1
+ l = [node for node in self.found.values() if node.num_values > 0]
+ reactor.callLater(0, self.callback, l)
+
+ def goWithNodes(self, nodes):
+ """
+ this starts the process, our argument is a transaction with t.extras being our list of nodes
+ it's a transaction since we got called from the dispatcher
+ """
+ for node in nodes:
+ if node.id == self.caller.node.id:
+ continue
+ else:
+ self.found[node.id] = node
+
+ self.schedule()
+
+
+class GetValue(ActionBase):
+ def __init__(self, caller, target, num, callback, config, action="getValue"):
+ ActionBase.__init__(self, caller, target, callback, config)
+ self.num_values = num
+ self.outstanding_gets = 0
+ self.action = action
+
+ def gotValues(self, dict, node):
+ dict = dict['rsp']
+ self.outstanding -= 1
+ self.caller.insertNode(node)
+ if self.finished:
+ return
+ if dict.has_key('values'):
def x(y, z=self.results):
if not z.has_key(y):
z[y] = 1
return None
z = len(dict['values'])
v = filter(None, map(x, dict['values']))
- if(len(v)):
+ if len(v):
reactor.callLater(0, self.callback, self.target, v)
- self.schedule()
-
- ## get value
+ if len(self.results) >= self.num_values:
+ self.finished=1
+ reactor.callLater(0, self.callback, self.target, [])
+ else:
+ if not len(self.results) + self.outstanding_gets >= self.num_values:
+ self.schedule()
+
def schedule(self):
if self.finished:
return
- l = self.found.values()
- l.sort(self.sort)
- for node in l[:self.config['K']]:
- if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
- #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
+ for node in self.nodes:
+ if node.id not in self.queried and node.id != self.caller.node.id and node.num_values > 0:
try:
- f = getattr(node, self.findValue)
+ f = getattr(node, self.action)
except AttributeError:
- log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue))
+ log.msg("%s doesn't have a %s method!" % (node, self.action))
else:
- df = f(self.target, self.caller.node.id)
- df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
- self.outstanding = self.outstanding + 1
+ self.outstanding += 1
+ self.outstanding_gets += node.num_values
+ df = f(self.target, 0, self.caller.node.id)
+ df.addCallbacks(self.gotValues, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
self.queried[node.id] = 1
- if self.outstanding >= self.config['CONCURRENT_REQS']:
+ if len(self.results) + self.outstanding_gets >= self.num_values or \
+ self.outstanding >= self.config['CONCURRENT_REQS']:
break
assert self.outstanding >=0
if self.outstanding == 0:
- ## all done, didn't find it!!
- self.finished=1
+ self.finished = 1
reactor.callLater(0, self.callback, self.target, [])
-
- ## get value
- def goWithNodes(self, nodes, found=None):
+
+ def goWithNodes(self, nodes, found = None):
self.results = {}
if found:
for n in found:
self.results[n] = 1
- for node in nodes:
- if node.id == self.caller.node.id:
- continue
- else:
- self.found[node.id] = node
-
+ self.nodes = nodes
+ self.nodes.sort(self.sort)
self.schedule()
class StoreValue(ActionBase):
- def __init__(self, caller, target, value, callback, config, store="storeValue"):
+ def __init__(self, caller, target, value, callback, config, action="storeValue"):
ActionBase.__init__(self, caller, target, callback, config)
self.value = value
self.stored = []
- self.store = store
+ self.action = action
def storedValue(self, t, node):
self.outstanding -= 1
if not node.id == self.caller.node.id:
self.outstanding += 1
try:
- f = getattr(node, self.store)
+ f = getattr(node, self.action)
except AttributeError:
- log.msg("%s doesn't have a %s method!" % (node, self.store))
+ log.msg("%s doesn't have a %s method!" % (node, self.action))
else:
df = f(self.target, self.value, node.token, self.caller.node.id)
df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
l.append(row[0])
return l
+ def countValues(self, key):
+ """Count the number of values in the database."""
+ c = self.conn.cursor()
+ c.execute("SELECT COUNT(value) as num_values FROM kv WHERE key = ?", (khash(key),))
+ res = 0
+ row = c.fetchone()
+ if row:
+ res = row[0]
+ return res
+
def storeValue(self, key, value):
"""Store or update a key and value."""
c = self.conn.cursor()
warnings.simplefilter("ignore", DeprecationWarning)
from datetime import datetime, timedelta
-from random import randrange
+from random import randrange, shuffle
from sha import sha
import os
from ktable import KTable
from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID
from khash import newID, newIDInRange
-from actions import FindNode, GetValue, StoreValue
+from actions import FindNode, FindValue, GetValue, StoreValue
import krpc
# this is the base class, has base functionality and find node, no key-value mappings
_Node = KNodeRead
## also async
+ def findValue(self, key, callback, errback=None):
+ """ returns the contact info for nodes that have values for the key, from the global table """
+ # get K nodes out of local table/cache
+ nodes = self.table.findNodes(key)
+ d = Deferred()
+ if errback:
+ d.addCallbacks(callback, errback)
+ else:
+ d.addCallback(callback)
+
+ # create our search state
+ state = FindValue(self, key, d.callback, self.config)
+ reactor.callLater(0, state.goWithNodes, nodes)
+
def valueForKey(self, key, callback, searchlocal = 1):
""" returns the values found for key in global table
callback will be called with a list of values for each peer that returns unique values
final callback will be an empty list - probably should change to 'more coming' arg
"""
- nodes = self.table.findNodes(key)
-
# get locals
if searchlocal:
l = self.store.retrieveValues(key)
reactor.callLater(0, callback, key, l)
else:
l = []
-
- # create our search state
- state = GetValue(self, key, callback, self.config)
- reactor.callLater(0, state.goWithNodes, nodes, l)
+
+ def _getValueForKey(nodes, key=key, local_values=l, response=callback, table=self.table, config=self.config):
+ # create our search state
+ state = GetValue(table, key, 50 - len(local_values), response, config)
+ reactor.callLater(0, state.goWithNodes, nodes, local_values)
+
+ # this call is asynch
+ self.findValue(key, _getValueForKey)
#### Remote Interface - called by remote nodes
def krpc_find_value(self, key, id, _krpc_sender):
n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
self.insertNode(n, contacted=0)
+ nodes = self.table.findNodes(key)
+ nodes = map(lambda node: node.contactInfo(), nodes)
+ num_values = self.store.countValues(key)
+ return {'nodes' : nodes, 'num' : num_values, "id": self.node.id}
+
+ def krpc_get_value(self, key, num, id, _krpc_sender):
+ n = self.Node(id, _krpc_sender[0], _krpc_sender[1])
+ self.insertNode(n, contacted=0)
+
l = self.store.retrieveValues(key)
- if len(l) > 0:
+ if num == 0 or num >= len(l):
return {'values' : l, "id": self.node.id}
else:
- nodes = self.table.findNodes(key)
- nodes = map(lambda node: node.contactInfo(), nodes)
- return {'nodes' : nodes, "id": self.node.id}
+ shuffle(l)
+ return {'values' : l[:num], "id": self.node.id}
### provides a generic write method, you probably don't want to deploy something that allows
### arbitrary value storage
in this implementation, peers respond but don't indicate status to storing values
a key can have many values
"""
- def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table):
+ def _storeValueForKey(nodes, key=key, value=value, response=callback, table=self.table, config=self.config):
if not response:
# default callback
def _storedValueHandler(key, value, sender):
pass
response=_storedValueHandler
- action = StoreValue(self.table, key, value, response, self.config)
+ action = StoreValue(table, key, value, response, config)
reactor.callLater(0, action.goWithNodes, nodes)
# this call is asynch
df.addCallback(self.checkSender)
return df
+ def getValue(self, key, num, id):
+ df = self.conn.sendRequest('get_value', {"key" : key, "num": num, "id" : id})
+ df.addErrback(self.errBack)
+ df.addCallback(self.checkSender)
+ return df
+
class KNodeWrite(KNodeRead):
def storeValue(self, key, value, token, id):
df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "token" : token, "id": id})
self.host = host
self.port = int(port)
self.token = ''
+ self.num_values = 0
self._contactInfo = None
def updateLastSeen(self):
def updateToken(self, token):
self.token = token
+ def updateNumValues(self, num_values):
+ self.num_values = num_values
+
def msgFailed(self):
self.fails = self.fails + 1
return self.fails