X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_p2p_Khashmir%2Fkhashmir.py;h=7abed574dee3b59d78115a94f76c3bd9a1d24821;hp=f8c718fc3e549c6bb2b94c71fc3dbfaaefca7afd;hb=dbcf7d0324c5ddee23bf170695e173fdff5a2c0e;hpb=23f841b6360b9968d5dad2ef94f4311670375796 diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index f8c718f..7abed57 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -9,6 +9,7 @@ warnings.simplefilter("ignore", DeprecationWarning) from datetime import datetime, timedelta from random import randrange, shuffle from sha import sha +from copy import copy import os from twisted.internet.defer import Deferred @@ -79,7 +80,7 @@ class KhashmirBase(protocol.Factory): self.node = self._loadSelfNode('', self.port) self.table = KTable(self.node, config) self.token_secrets = [newID()] - self.stats = StatsLogger(self.table, self.store, self.config) + self.stats = StatsLogger(self.table, self.store) # Start listening self.udp = krpc.hostbroker(self, self.stats, config) @@ -161,7 +162,7 @@ class KhashmirBase(protocol.Factory): n = self.Node(NULL_ID, host, port) self.sendJoin(n, callback=callback, errback=errback) - def findNode(self, id, callback, errback=None): + def findNode(self, id, callback): """Find the contact info for the K closest nodes in the global table. @type id: C{string} @@ -169,25 +170,13 @@ class KhashmirBase(protocol.Factory): @type callback: C{method} @param callback: the method to call with the results, it must take 1 parameter, the list of K closest nodes - @type errback: C{method} - @param errback: the method to call if an error occurs - (optional, defaults to doing nothing when an error occurs) """ - # Get K nodes out of local table/cache - nodes = self.table.findNodes(id) - d = Deferred() - if errback: - d.addCallbacks(callback, errback) - else: - d.addCallback(callback) + # Start with our node + nodes = [copy(self.node)] - # If the target ID was found - if len(nodes) == 1 and nodes[0].id == id: - d.callback(nodes) - else: - # Start the finding nodes action - state = FindNode(self, id, d.callback, self.config, self.stats) - reactor.callLater(0, state.goWithNodes, nodes) + # Start the finding nodes action + state = FindNode(self, id, callback, self.config, self.stats) + reactor.callLater(0, state.goWithNodes, nodes) def insertNode(self, node, contacted = True): """Try to insert a node in our local table, pinging oldest contact if necessary. @@ -209,15 +198,16 @@ class KhashmirBase(protocol.Factory): (datetime.now() - old.lastSeen) > timedelta(seconds=self.config['MIN_PING_INTERVAL'])): - def _staleNodeHandler(err, oldnode = old, newnode = node, self = self): + def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()): """The pinged node never responded, so replace it.""" log.msg("ping failed (%s) %s/%s" % (self.config['PORT'], oldnode.host, oldnode.port)) log.err(err) + self.stats.completedAction('ping', start) self.table.replaceStaleNode(oldnode, newnode) - def _notStaleNodeHandler(dict, old=old, self=self): + def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()): """Got a pong from the old node, so update it.""" - dict = dict['rsp'] + self.stats.completedAction('ping', start) if dict['id'] == old.id: self.table.justSeenNode(old.id) @@ -240,17 +230,19 @@ class KhashmirBase(protocol.Factory): (optional, defaults to calling the callback with None) """ - def _pongHandler(dict, node=node, self=self, callback=callback): + def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()): """Node responded properly, callback with response.""" - n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) + n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) + self.stats.completedAction('join', start) self.insertNode(n) if callback: - callback((dict['rsp']['ip_addr'], dict['rsp']['port'])) + callback((dict['ip_addr'], dict['port'])) - def _defaultPong(err, node=node, self=self, callback=callback, errback=errback): + def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()): """Error occurred, fail node and errback or callback with error.""" log.msg("join failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port)) log.err(err) + self.stats.completedAction('join', start) self.table.nodeFailed(node) if errback: errback() @@ -261,7 +253,7 @@ class KhashmirBase(protocol.Factory): df = node.join(self.node.id) df.addCallbacks(_pongHandler, _defaultPong) - def findCloseNodes(self, callback=lambda a: None, errback = None): + def findCloseNodes(self, callback=lambda a: None): """Perform a findNode on the ID one away from our own. This will allow us to populate our table with nodes on our network @@ -272,12 +264,9 @@ class KhashmirBase(protocol.Factory): @param callback: the method to call with the results, it must take 1 parameter, the list of K closest nodes (optional, defaults to doing nothing with the results) - @type errback: C{method} - @param errback: the method to call if an error occurs - (optional, defaults to doing nothing when an error occurs) """ id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) - self.findNode(id, callback, errback) + self.findNode(id, callback) def refreshTable(self, force = False): """Check all the buckets for those that need refreshing. @@ -309,7 +298,7 @@ class KhashmirBase(protocol.Factory): return self.stats.formatHTML() #{ Remote interface - def krpc_ping(self, id, _krpc_sender): + def krpc_ping(self, id, _krpc_sender = None): """Pong with our ID. @type id: C{string} @@ -317,12 +306,13 @@ class KhashmirBase(protocol.Factory): @type _krpc_sender: (C{string}, C{int}) @param _krpc_sender: the sender node's IP address and port """ - n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + if _krpc_sender is not None: + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted = False) return {"id" : self.node.id} - def krpc_join(self, id, _krpc_sender): + def krpc_join(self, id, _krpc_sender = None): """Add the node by responding with its address and port. @type id: C{string} @@ -330,12 +320,15 @@ class KhashmirBase(protocol.Factory): @type _krpc_sender: (C{string}, C{int}) @param _krpc_sender: the sender node's IP address and port """ - n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + if _krpc_sender is not None: + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted = False) + else: + _krpc_sender = ('127.0.0.1', self.port) return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id} - def krpc_find_node(self, target, id, _krpc_sender): + def krpc_find_node(self, id, target, _krpc_sender = None): """Find the K closest nodes to the target in the local routing table. @type target: C{string} @@ -345,8 +338,11 @@ class KhashmirBase(protocol.Factory): @type _krpc_sender: (C{string}, C{int}) @param _krpc_sender: the sender node's IP address and port """ - n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + if _krpc_sender is not None: + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted = False) + else: + _krpc_sender = ('127.0.0.1', self.port) nodes = self.table.findNodes(target) nodes = map(lambda node: node.contactInfo(), nodes) @@ -360,7 +356,7 @@ class KhashmirRead(KhashmirBase): _Node = KNodeRead #{ Local interface - def findValue(self, key, callback, errback=None): + def findValue(self, key, callback): """Get the nodes that have values for the key from the global table. @type key: C{string} @@ -368,20 +364,12 @@ class KhashmirRead(KhashmirBase): @type callback: C{method} @param callback: the method to call with the results, it must take 1 parameter, the list of nodes with values - @type errback: C{method} - @param errback: the method to call if an error occurs - (optional, defaults to doing nothing when an error occurs) """ - # Get K nodes out of local table/cache - nodes = self.table.findNodes(key) - d = Deferred() - if errback: - d.addCallbacks(callback, errback) - else: - d.addCallback(callback) - + # Start with ourself + nodes = [copy(self.node)] + # Search for others starting with the locally found ones - state = FindValue(self, key, d.callback, self.config, self.stats) + state = FindValue(self, key, callback, self.config, self.stats) reactor.callLater(0, state.goWithNodes, nodes) def valueForKey(self, key, callback, searchlocal = True): @@ -398,24 +386,25 @@ class KhashmirRead(KhashmirBase): @type searchlocal: C{boolean} @param searchlocal: whether to also look for any local values """ - # Get any local values - if searchlocal: - l = self.store.retrieveValues(key) - if len(l) > 0: - reactor.callLater(0, callback, key, l) - else: - l = [] - def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self): + def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal): """Use the found nodes to send requests for values to.""" - state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats) + # Get any local values + if searchlocal: + l = self.store.retrieveValues(key) + if len(l) > 0: + node = copy(self.node) + node.updateNumValues(len(l)) + nodes = nodes + [node] + + state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats) reactor.callLater(0, state.goWithNodes, nodes) # First lookup nodes that have values for the key self.findValue(key, _getValueForKey) #{ Remote interface - def krpc_find_value(self, key, id, _krpc_sender): + def krpc_find_value(self, id, key, _krpc_sender = None): """Find the number of values stored locally for the key, and the K closest nodes. @type key: C{string} @@ -425,15 +414,16 @@ class KhashmirRead(KhashmirBase): @type _krpc_sender: (C{string}, C{int}) @param _krpc_sender: the sender node's IP address and port """ - n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + if _krpc_sender is not None: + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted = False) nodes = self.table.findNodes(key) nodes = map(lambda node: node.contactInfo(), nodes) num_values = self.store.countValues(key) return {'nodes' : nodes, 'num' : num_values, "id": self.node.id} - def krpc_get_value(self, key, num, id, _krpc_sender): + def krpc_get_value(self, id, key, num, _krpc_sender = None): """Retrieve the values stored locally for the key. @type key: C{string} @@ -446,8 +436,9 @@ class KhashmirRead(KhashmirBase): @type _krpc_sender: (C{string}, C{int}) @param _krpc_sender: the sender node's IP address and port """ - n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + if _krpc_sender is not None: + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted = False) l = self.store.retrieveValues(key) if num == 0 or num >= len(l): @@ -492,7 +483,7 @@ class KhashmirWrite(KhashmirRead): self.findNode(key, _storeValueForKey) #{ Remote interface - def krpc_store_value(self, key, value, token, id, _krpc_sender): + def krpc_store_value(self, id, key, value, token, _krpc_sender = None): """Store the value locally with the key. @type key: C{string} @@ -505,8 +496,12 @@ class KhashmirWrite(KhashmirRead): @type _krpc_sender: (C{string}, C{int}) @param _krpc_sender: the sender node's IP address and port """ - n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + if _krpc_sender is not None: + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted = False) + else: + _krpc_sender = ('127.0.0.1', self.port) + for secret in self.token_secrets: this_token = sha(secret + _krpc_sender[0]).digest() if token == this_token: @@ -523,11 +518,12 @@ class Khashmir(KhashmirWrite): class SimpleTests(unittest.TestCase): timeout = 10 - DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, + DHT_DEFAULTS = {'PORT': 9977, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, + 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'KEY_EXPIRE': 3600, 'SPEW': False, } def setUp(self): @@ -596,11 +592,12 @@ class MultiTest(unittest.TestCase): timeout = 30 num = 20 - DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, + DHT_DEFAULTS = {'PORT': 9977, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, + 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'KEY_EXPIRE': 3600, 'SPEW': False, } def _done(self, val):