X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=apt_p2p_Khashmir%2Fkhashmir.py;h=5db5ed8ab48015c9c73073f9c8ceab8eac652701;hb=52a1d47b47a4f68ee189d0ae7e67dfb815d7258a;hp=48a0c0c5ee6801d07cfdb163a54d7f89297bea1e;hpb=80f92d033a70cb932d76768e96c681475031d7b7;p=quix0rs-apt-p2p.git diff --git a/apt_p2p_Khashmir/khashmir.py b/apt_p2p_Khashmir/khashmir.py index 48a0c0c..5db5ed8 100644 --- a/apt_p2p_Khashmir/khashmir.py +++ b/apt_p2p_Khashmir/khashmir.py @@ -9,10 +9,12 @@ warnings.simplefilter("ignore", DeprecationWarning) from datetime import datetime, timedelta from random import randrange, shuffle from sha import sha +from copy import copy import os from twisted.internet.defer import Deferred from twisted.internet import protocol, reactor +from twisted.python import log from twisted.trial import unittest from db import DB @@ -78,7 +80,7 @@ class KhashmirBase(protocol.Factory): self.node = self._loadSelfNode('', self.port) self.table = KTable(self.node, config) self.token_secrets = [newID()] - self.stats = StatsLogger(self.table, self.store, self.config) + self.stats = StatsLogger(self.table, self.store) # Start listening self.udp = krpc.hostbroker(self, self.stats, config) @@ -160,7 +162,7 @@ class KhashmirBase(protocol.Factory): n = self.Node(NULL_ID, host, port) self.sendJoin(n, callback=callback, errback=errback) - def findNode(self, id, callback, errback=None): + def findNode(self, id, callback): """Find the contact info for the K closest nodes in the global table. @type id: C{string} @@ -168,25 +170,13 @@ class KhashmirBase(protocol.Factory): @type callback: C{method} @param callback: the method to call with the results, it must take 1 parameter, the list of K closest nodes - @type errback: C{method} - @param errback: the method to call if an error occurs - (optional, defaults to doing nothing when an error occurs) """ - # Get K nodes out of local table/cache - nodes = self.table.findNodes(id) - d = Deferred() - if errback: - d.addCallbacks(callback, errback) - else: - d.addCallback(callback) + # Start with our node + nodes = [copy(self.node)] - # If the target ID was found - if len(nodes) == 1 and nodes[0].id == id: - d.callback(nodes) - else: - # Start the finding nodes action - state = FindNode(self, id, d.callback, self.config, self.stats) - reactor.callLater(0, state.goWithNodes, nodes) + # Start the finding nodes action + state = FindNode(self, id, callback, self.config, self.stats) + reactor.callLater(0, state.goWithNodes, nodes) def insertNode(self, node, contacted = True): """Try to insert a node in our local table, pinging oldest contact if necessary. @@ -208,13 +198,15 @@ class KhashmirBase(protocol.Factory): (datetime.now() - old.lastSeen) > timedelta(seconds=self.config['MIN_PING_INTERVAL'])): - def _staleNodeHandler(oldnode = old, newnode = node): + def _staleNodeHandler(err, oldnode = old, newnode = node, self = self, start = datetime.now()): """The pinged node never responded, so replace it.""" + log.msg("action ping failed on %s/%s: %s" % (oldnode.host, oldnode.port, err.getErrorMessage())) + self.stats.completedAction('ping', start) self.table.replaceStaleNode(oldnode, newnode) - def _notStaleNodeHandler(dict, old=old): + def _notStaleNodeHandler(dict, old = old, self = self, start = datetime.now()): """Got a pong from the old node, so update it.""" - dict = dict['rsp'] + self.stats.completedAction('ping', start) if dict['id'] == old.id: self.table.justSeenNode(old.id) @@ -237,16 +229,19 @@ class KhashmirBase(protocol.Factory): (optional, defaults to calling the callback with None) """ - def _pongHandler(dict, node=node, self=self, callback=callback): + def _pongHandler(dict, node=node, self=self, callback=callback, start = datetime.now()): """Node responded properly, callback with response.""" - n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) - self.insertNode(n) + n = self.Node(dict['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) + self.stats.completedAction('join', start) + reactor.callLater(0, self.insertNode, n) if callback: - callback((dict['rsp']['ip_addr'], dict['rsp']['port'])) + callback((dict['ip_addr'], dict['port'])) - def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback): + def _defaultPong(err, node=node, self=self, callback=callback, errback=errback, start = datetime.now()): """Error occurred, fail node and errback or callback with error.""" - table.nodeFailed(node) + log.msg("action join failed on %s/%s: %s" % (node.host, node.port, err.getErrorMessage())) + self.stats.completedAction('join', start) + self.table.nodeFailed(node) if errback: errback() elif callback: @@ -256,7 +251,7 @@ class KhashmirBase(protocol.Factory): df = node.join(self.node.id) df.addCallbacks(_pongHandler, _defaultPong) - def findCloseNodes(self, callback=lambda a: None, errback = None): + def findCloseNodes(self, callback=lambda a: None): """Perform a findNode on the ID one away from our own. This will allow us to populate our table with nodes on our network @@ -267,12 +262,9 @@ class KhashmirBase(protocol.Factory): @param callback: the method to call with the results, it must take 1 parameter, the list of K closest nodes (optional, defaults to doing nothing with the results) - @type errback: C{method} - @param errback: the method to call if an error occurs - (optional, defaults to doing nothing when an error occurs) """ id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) - self.findNode(id, callback, errback) + self.findNode(id, callback) def refreshTable(self, force = False): """Check all the buckets for those that need refreshing. @@ -301,10 +293,10 @@ class KhashmirBase(protocol.Factory): def getStats(self): """Gather the statistics for the DHT.""" - return self.stats.gather() + return self.stats.formatHTML() #{ Remote interface - def krpc_ping(self, id, _krpc_sender): + def krpc_ping(self, id, _krpc_sender = None): """Pong with our ID. @type id: C{string} @@ -312,12 +304,13 @@ class KhashmirBase(protocol.Factory): @type _krpc_sender: (C{string}, C{int}) @param _krpc_sender: the sender node's IP address and port """ - n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + if _krpc_sender is not None: + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + reactor.callLater(0, self.insertNode, n, False) return {"id" : self.node.id} - def krpc_join(self, id, _krpc_sender): + def krpc_join(self, id, _krpc_sender = None): """Add the node by responding with its address and port. @type id: C{string} @@ -325,12 +318,15 @@ class KhashmirBase(protocol.Factory): @type _krpc_sender: (C{string}, C{int}) @param _krpc_sender: the sender node's IP address and port """ - n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + if _krpc_sender is not None: + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + reactor.callLater(0, self.insertNode, n, False) + else: + _krpc_sender = ('127.0.0.1', self.port) return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id} - def krpc_find_node(self, target, id, _krpc_sender): + def krpc_find_node(self, id, target, _krpc_sender = None): """Find the K closest nodes to the target in the local routing table. @type target: C{string} @@ -340,8 +336,11 @@ class KhashmirBase(protocol.Factory): @type _krpc_sender: (C{string}, C{int}) @param _krpc_sender: the sender node's IP address and port """ - n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + if _krpc_sender is not None: + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + reactor.callLater(0, self.insertNode, n, False) + else: + _krpc_sender = ('127.0.0.1', self.port) nodes = self.table.findNodes(target) nodes = map(lambda node: node.contactInfo(), nodes) @@ -355,7 +354,7 @@ class KhashmirRead(KhashmirBase): _Node = KNodeRead #{ Local interface - def findValue(self, key, callback, errback=None): + def findValue(self, key, callback): """Get the nodes that have values for the key from the global table. @type key: C{string} @@ -363,20 +362,12 @@ class KhashmirRead(KhashmirBase): @type callback: C{method} @param callback: the method to call with the results, it must take 1 parameter, the list of nodes with values - @type errback: C{method} - @param errback: the method to call if an error occurs - (optional, defaults to doing nothing when an error occurs) """ - # Get K nodes out of local table/cache - nodes = self.table.findNodes(key) - d = Deferred() - if errback: - d.addCallbacks(callback, errback) - else: - d.addCallback(callback) - + # Start with ourself + nodes = [copy(self.node)] + # Search for others starting with the locally found ones - state = FindValue(self, key, d.callback, self.config, self.stats) + state = FindValue(self, key, callback, self.config, self.stats) reactor.callLater(0, state.goWithNodes, nodes) def valueForKey(self, key, callback, searchlocal = True): @@ -393,24 +384,25 @@ class KhashmirRead(KhashmirBase): @type searchlocal: C{boolean} @param searchlocal: whether to also look for any local values """ - # Get any local values - if searchlocal: - l = self.store.retrieveValues(key) - if len(l) > 0: - reactor.callLater(0, callback, key, l) - else: - l = [] - def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self): + def _getValueForKey(nodes, key=key, response=callback, self=self, searchlocal=searchlocal): """Use the found nodes to send requests for values to.""" - state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config, self.stats) + # Get any local values + if searchlocal: + l = self.store.retrieveValues(key) + if len(l) > 0: + node = copy(self.node) + node.updateNumValues(len(l)) + nodes = nodes + [node] + + state = GetValue(self, key, self.config['RETRIEVE_VALUES'], response, self.config, self.stats) reactor.callLater(0, state.goWithNodes, nodes) # First lookup nodes that have values for the key self.findValue(key, _getValueForKey) #{ Remote interface - def krpc_find_value(self, key, id, _krpc_sender): + def krpc_find_value(self, id, key, _krpc_sender = None): """Find the number of values stored locally for the key, and the K closest nodes. @type key: C{string} @@ -420,15 +412,16 @@ class KhashmirRead(KhashmirBase): @type _krpc_sender: (C{string}, C{int}) @param _krpc_sender: the sender node's IP address and port """ - n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + if _krpc_sender is not None: + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + reactor.callLater(0, self.insertNode, n, False) nodes = self.table.findNodes(key) nodes = map(lambda node: node.contactInfo(), nodes) num_values = self.store.countValues(key) return {'nodes' : nodes, 'num' : num_values, "id": self.node.id} - def krpc_get_value(self, key, num, id, _krpc_sender): + def krpc_get_value(self, id, key, num, _krpc_sender = None): """Retrieve the values stored locally for the key. @type key: C{string} @@ -441,8 +434,9 @@ class KhashmirRead(KhashmirBase): @type _krpc_sender: (C{string}, C{int}) @param _krpc_sender: the sender node's IP address and port """ - n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + if _krpc_sender is not None: + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + reactor.callLater(0, self.insertNode, n, False) l = self.store.retrieveValues(key) if num == 0 or num >= len(l): @@ -487,7 +481,7 @@ class KhashmirWrite(KhashmirRead): self.findNode(key, _storeValueForKey) #{ Remote interface - def krpc_store_value(self, key, value, token, id, _krpc_sender): + def krpc_store_value(self, id, key, value, token, _krpc_sender = None): """Store the value locally with the key. @type key: C{string} @@ -500,8 +494,12 @@ class KhashmirWrite(KhashmirRead): @type _krpc_sender: (C{string}, C{int}) @param _krpc_sender: the sender node's IP address and port """ - n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) - self.insertNode(n, contacted = False) + if _krpc_sender is not None: + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + reactor.callLater(0, self.insertNode, n, False) + else: + _krpc_sender = ('127.0.0.1', self.port) + for secret in self.token_secrets: this_token = sha(secret + _krpc_sender[0]).digest() if token == this_token: @@ -518,11 +516,12 @@ class Khashmir(KhashmirWrite): class SimpleTests(unittest.TestCase): timeout = 10 - DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, + DHT_DEFAULTS = {'PORT': 9977, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, + 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'KEY_EXPIRE': 3600, 'SPEW': False, } def setUp(self): @@ -591,11 +590,12 @@ class MultiTest(unittest.TestCase): timeout = 30 num = 20 - DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, + DHT_DEFAULTS = {'PORT': 9977, 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, 'MAX_FAILURES': 3, 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, + 'KRPC_TIMEOUT': 14, 'KRPC_INITIAL_DELAY': 2, 'KEY_EXPIRE': 3600, 'SPEW': False, } def _done(self, val):