X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_dht_Khashmir%2Fkhashmir.py;h=eeaab0acf638215dc2ec707d6c0d62daff48d04c;hb=928a2934ab9cfc7386269b756e1ce815059be3a9;hp=a9a78674a048c8e3d6e0f0e3392bcb285bd7bfce;hpb=1fb0d1714e1b0af6dc6abb8adc75a9e31f77fa29;p=quix0rs-apt-p2p.git diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index a9a7867..eeaab0a 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -5,7 +5,7 @@ import warnings warnings.simplefilter("ignore", DeprecationWarning) from datetime import datetime, timedelta -from random import randrange +from random import randrange, shuffle from sha import sha import os @@ -17,7 +17,7 @@ from db import DB from ktable import KTable from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID from khash import newID, newIDInRange -from actions import FindNode, GetValue, StoreValue +from actions import FindNode, FindValue, GetValue, StoreValue import krpc # this is the base class, has base functionality and find node, no key-value mappings @@ -224,13 +224,25 @@ class KhashmirRead(KhashmirBase): _Node = KNodeRead ## also async + def findValue(self, key, callback, errback=None): + """ returns the contact info for nodes that have values for the key, from the global table """ + # get K nodes out of local table/cache + nodes = self.table.findNodes(key) + d = Deferred() + if errback: + d.addCallbacks(callback, errback) + else: + d.addCallback(callback) + + # create our search state + state = FindValue(self, key, d.callback, self.config) + reactor.callLater(0, state.goWithNodes, nodes) + def valueForKey(self, key, callback, searchlocal = 1): """ returns the values found for key in global table callback will be called with a list of values for each peer that returns unique values final callback will be an empty list - probably should change to 'more coming' arg """ - nodes = self.table.findNodes(key) - # get locals if searchlocal: l = self.store.retrieveValues(key) @@ -238,23 +250,35 @@ class KhashmirRead(KhashmirBase): reactor.callLater(0, callback, key, l) else: l = [] - - # create our search state - state = GetValue(self, key, callback, self.config) - reactor.callLater(0, state.goWithNodes, nodes, l) + + def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self): + # create our search state + state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config) + reactor.callLater(0, state.goWithNodes, nodes) + + # this call is asynch + self.findValue(key, _getValueForKey) #### Remote Interface - called by remote nodes def krpc_find_value(self, key, id, _krpc_sender): n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) self.insertNode(n, contacted=0) + nodes = self.table.findNodes(key) + nodes = map(lambda node: node.contactInfo(), nodes) + num_values = self.store.countValues(key) + return {'nodes' : nodes, 'num' : num_values, "id": self.node.id} + + def krpc_get_value(self, key, num, id, _krpc_sender): + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted=0) + l = self.store.retrieveValues(key) - if len(l) > 0: + if num == 0 or num >= len(l): return {'values' : l, "id": self.node.id} else: - nodes = self.table.findNodes(key) - nodes = map(lambda node: node.contactInfo(), nodes) - return {'nodes' : nodes, "id": self.node.id} + shuffle(l) + return {'values' : l[:num], "id": self.node.id} ### provides a generic write method, you probably don't want to deploy something that allows ### arbitrary value storage @@ -266,13 +290,13 @@ class KhashmirWrite(KhashmirRead): in this implementation, peers respond but don't indicate status to storing values a key can have many values """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self): if not response: # default callback def _storedValueHandler(key, value, sender): pass response=_storedValueHandler - action = StoreValue(self.table, key, value, response, self.config) + action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config) reactor.callLater(0, action.goWithNodes, nodes) # this call is asynch @@ -298,7 +322,8 @@ class SimpleTests(unittest.TestCase): timeout = 10 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, - 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, + 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, + 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEY_EXPIRE': 3600, 'SPEW': False, } @@ -371,7 +396,8 @@ class MultiTest(unittest.TestCase): num = 20 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, - 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, + 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, + 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEY_EXPIRE': 3600, 'SPEW': False, }