X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_dht_Khashmir%2Fkhashmir.py;h=d3479e661e4f610723ff9a1cc825b19078b9e153;hp=ae11dd7ef07158642637df47810aebb6a8c8ee6e;hb=63c013ac1c397bfc19cbed986a09113efada0eeb;hpb=8297bc9a2aa8132ea1a7363761d9d5c73a1efca2 diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index ae11dd7..d3479e6 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -33,6 +33,7 @@ class KhashmirBase(protocol.Factory): self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db')) self.node = self._loadSelfNode('', self.port) self.table = KTable(self.node, config) + self.token_secrets = [newID()] #self.app = service.Application("krpc") self.udp = krpc.hostbroker(self, config) self.udp.protocol = krpc.KRPC @@ -59,6 +60,9 @@ class KhashmirBase(protocol.Factory): return self._Node(id, host, port) def checkpoint(self, auto=0): + self.token_secrets.insert(0, newID()) + if len(self.token_secrets) > 3: + self.token_secrets.pop() self.store.saveSelfNode(self.node.id) self.store.dumpRoutingTable(self.table.buckets) self.refreshTable() @@ -210,8 +214,9 @@ class KhashmirBase(protocol.Factory): n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) self.insertNode(n, contacted=0) nodes = self.table.findNodes(target) - nodes = map(lambda node: node.senderDict(), nodes) - return {"nodes" : nodes, "id" : self.node.id} + nodes = map(lambda node: node.contactInfo(), nodes) + token = sha(self.token_secrets[0] + _krpc_sender[0]).digest() + return {"nodes" : nodes, "token" : token, "id" : self.node.id} ## This class provides read-only access to the DHT, valueForKey @@ -249,7 +254,7 @@ class KhashmirRead(KhashmirBase): return {'values' : l, "id": self.node.id} else: nodes = self.table.findNodes(key) - nodes = map(lambda node: node.senderDict(), nodes) + nodes = map(lambda node: node.contactInfo(), nodes) return {'nodes' : nodes, "id": self.node.id} ### provides a generic write method, you probably don't want to deploy something that allows @@ -257,29 +262,33 @@ class KhashmirRead(KhashmirBase): class KhashmirWrite(KhashmirRead): _Node = KNodeWrite ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) - def storeValueForKey(self, key, value, originated, callback=None): + def storeValueForKey(self, key, value, callback=None): """ stores the value and origination time for key in the global table, returns immediately, no status in this implementation, peers respond but don't indicate status to storing values a key can have many values """ - def _storeValueForKey(nodes, key=key, value=value, originated=originated, response=callback , table=self.table): + def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): if not response: # default callback def _storedValueHandler(key, value, sender): pass response=_storedValueHandler - action = StoreValue(self.table, key, value, originated, response, self.config) + action = StoreValue(self.table, key, value, response, self.config) reactor.callLater(0, action.goWithNodes, nodes) # this call is asynch self.findNode(key, _storeValueForKey) #### Remote Interface - called by remote nodes - def krpc_store_value(self, key, value, originated, id, _krpc_sender): + def krpc_store_value(self, key, value, token, id, _krpc_sender): n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) self.insertNode(n, contacted=0) - self.store.storeValue(key, value, originated) - return {"id" : self.node.id} + for secret in self.token_secrets: + this_token = sha(secret + _krpc_sender[0]).digest() + if token == this_token: + self.store.storeValue(key, value) + return {"id" : self.node.id} + raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one') # the whole shebang, for testing class Khashmir(KhashmirWrite): @@ -289,7 +298,7 @@ class SimpleTests(unittest.TestCase): timeout = 10 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, - 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, @@ -335,7 +344,7 @@ class SimpleTests(unittest.TestCase): reactor.iterate() reactor.iterate() self.got = 0 - self.a.storeValueForKey(sha('foo').digest(), 'foobar', datetime.utcnow()) + self.a.storeValueForKey(sha('foo').digest(), 'foobar') reactor.iterate() reactor.iterate() reactor.iterate() @@ -363,7 +372,7 @@ class MultiTest(unittest.TestCase): timeout = 30 num = 20 DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, - 'CHECKPOINT_INTERVAL': 900, 'CONCURRENT_REQS': 4, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, 'KEINITIAL_DELAY': 15, 'KE_DELAY': 1200, @@ -417,7 +426,7 @@ class MultiTest(unittest.TestCase): self.done = 0 def _scb(key, value, result): self.done = 1 - self.l[randrange(0, self.num)].storeValueForKey(K, V, datetime.utcnow(), _scb) + self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb) while not self.done: reactor.iterate()