X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_dht_Khashmir%2Fkhashmir.py;h=58956692a09c66fca6f71b4596ea666748bfb8c5;hb=d8b63cce3887dfd61f3a8321d0c45327c4a1808b;hp=af6f0d8817156d0eba8af78feb2c4e320af2346f;hpb=07893ed1ffd3cdcc1a8b51d0854599d61b2b5fac;p=quix0rs-apt-p2p.git diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index af6f0d8..5895669 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -5,7 +5,7 @@ import warnings warnings.simplefilter("ignore", DeprecationWarning) from datetime import datetime, timedelta -from random import randrange +from random import randrange, shuffle from sha import sha import os @@ -17,7 +17,7 @@ from db import DB from ktable import KTable from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID from khash import newID, newIDInRange -from actions import FindNode, GetValue, KeyExpirer, StoreValue +from actions import FindNode, FindValue, GetValue, StoreValue import krpc # this is the base class, has base functionality and find node, no key-value mappings @@ -30,15 +30,15 @@ class KhashmirBase(protocol.Factory): def setup(self, config, cache_dir): self.config = config self.port = config['PORT'] - self.store = DB(os.path.join(cache_dir, '.khashmir.' + str(self.port) + '.db')) + self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db')) self.node = self._loadSelfNode('', self.port) self.table = KTable(self.node, config) + self.token_secrets = [newID()] #self.app = service.Application("krpc") self.udp = krpc.hostbroker(self, config) self.udp.protocol = krpc.KRPC self.listenport = reactor.listenUDP(self.port, self.udp) self._loadRoutingTable() - self.expirer = KeyExpirer(self.store, config) self.refreshTable(force=1) self.next_checkpoint = reactor.callLater(60, self.checkpoint, (1,)) @@ -59,8 +59,12 @@ class KhashmirBase(protocol.Factory): return self._Node(id, host, port) def checkpoint(self, auto=0): + self.token_secrets.insert(0, newID()) + if len(self.token_secrets) > 3: + self.token_secrets.pop() self.store.saveSelfNode(self.node.id) self.store.dumpRoutingTable(self.table.buckets) + self.store.expireValues(self.config['KEY_EXPIRE']) self.refreshTable() if auto: self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), @@ -192,7 +196,6 @@ class KhashmirBase(protocol.Factory): self.next_checkpoint.cancel() except: pass - self.expirer.shutdown() self.store.close() #### Remote Interface - called by remote nodes @@ -210,8 +213,9 @@ class KhashmirBase(protocol.Factory): n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) self.insertNode(n, contacted=0) nodes = self.table.findNodes(target) - nodes = map(lambda node: node.senderDict(), nodes) - return {"nodes" : nodes, "id" : self.node.id} + nodes = map(lambda node: node.contactInfo(), nodes) + token = sha(self.token_secrets[0] + _krpc_sender[0]).digest() + return {"nodes" : nodes, "token" : token, "id" : self.node.id} ## This class provides read-only access to the DHT, valueForKey @@ -220,13 +224,25 @@ class KhashmirRead(KhashmirBase): _Node = KNodeRead ## also async + def findValue(self, key, callback, errback=None): + """ returns the contact info for nodes that have values for the key, from the global table """ + # get K nodes out of local table/cache + nodes = self.table.findNodes(key) + d = Deferred() + if errback: + d.addCallbacks(callback, errback) + else: + d.addCallback(callback) + + # create our search state + state = FindValue(self, key, d.callback, self.config) + reactor.callLater(0, state.goWithNodes, nodes) + def valueForKey(self, key, callback, searchlocal = 1): """ returns the values found for key in global table callback will be called with a list of values for each peer that returns unique values final callback will be an empty list - probably should change to 'more coming' arg """ - nodes = self.table.findNodes(key) - # get locals if searchlocal: l = self.store.retrieveValues(key) @@ -234,23 +250,35 @@ class KhashmirRead(KhashmirBase): reactor.callLater(0, callback, key, l) else: l = [] - - # create our search state - state = GetValue(self, key, callback, self.config) - reactor.callLater(0, state.goWithNodes, nodes, l) + + def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self): + # create our search state + state = GetValue(self, key, local_values, 50, response, self.config) + reactor.callLater(0, state.goWithNodes, nodes) + + # this call is asynch + self.findValue(key, _getValueForKey) #### Remote Interface - called by remote nodes def krpc_find_value(self, key, id, _krpc_sender): n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) self.insertNode(n, contacted=0) + nodes = self.table.findNodes(key) + nodes = map(lambda node: node.contactInfo(), nodes) + num_values = self.store.countValues(key) + return {'nodes' : nodes, 'num' : num_values, "id": self.node.id} + + def krpc_get_value(self, key, num, id, _krpc_sender): + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted=0) + l = self.store.retrieveValues(key) - if len(l) > 0: + if num == 0 or num >= len(l): return {'values' : l, "id": self.node.id} else: - nodes = self.table.findNodes(key) - nodes = map(lambda node: node.senderDict(), nodes) - return {'nodes' : nodes, "id": self.node.id} + shuffle(l) + return {'values' : l[:num], "id": self.node.id} ### provides a generic write method, you probably don't want to deploy something that allows ### arbitrary value storage @@ -258,28 +286,32 @@ class KhashmirWrite(KhashmirRead): _Node = KNodeWrite ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) def storeValueForKey(self, key, value, callback=None): - """ stores the value for key in the global table, returns immediately, no status + """ stores the value and origination time for key in the global table, returns immediately, no status in this implementation, peers respond but don't indicate status to storing values a key can have many values """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self): if not response: # default callback def _storedValueHandler(key, value, sender): pass response=_storedValueHandler - action = StoreValue(self.table, key, value, response, self.config) + action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config) reactor.callLater(0, action.goWithNodes, nodes) # this call is asynch self.findNode(key, _storeValueForKey) #### Remote Interface - called by remote nodes - def krpc_store_value(self, key, value, id, _krpc_sender): + def krpc_store_value(self, key, value, token, id, _krpc_sender): n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) self.insertNode(n, contacted=0) - self.store.storeValue(key, value) - return {"id" : self.node.id} + for secret in self.token_secrets: + this_token = sha(secret + _krpc_sender[0]).digest() + if token == this_token: + self.store.storeValue(key, value) + return {"id" : self.node.id} + raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one') # the whole shebang, for testing class Khashmir(KhashmirWrite): @@ -289,11 +321,10 @@ class SimpleTests(unittest.TestCase): timeout = 10 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, - 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, - 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, - 'KE_AGE': 3600, 'SPEW': False, } + 'KEY_EXPIRE': 3600, 'SPEW': False, } def setUp(self): krpc.KRPC.noisy = 0 @@ -363,11 +394,10 @@ class MultiTest(unittest.TestCase): timeout = 30 num = 20 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, - 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, - 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, - 'KE_AGE': 3600, 'SPEW': False, } + 'KEY_EXPIRE': 3600, 'SPEW': False, } def _done(self, val): self.done = 1