X-Git-Url: https://git.mxchange.org/?p=quix0rs-apt-p2p.git;a=blobdiff_plain;f=apt_dht_Khashmir%2Fkhashmir.py;h=126a30ea585b86b45cff8f867ddaa0daa9e99a5f;hp=eaee998e384a6dff148884559e8aba01a148cb5a;hb=d78e24749831bf3803e5fbc217c442d3e93912e9;hpb=7dab2471d589ea439ccbfe794258863c2af858e9 diff --git a/apt_dht_Khashmir/khashmir.py b/apt_dht_Khashmir/khashmir.py index eaee998..126a30e 100644 --- a/apt_dht_Khashmir/khashmir.py +++ b/apt_dht_Khashmir/khashmir.py @@ -1,352 +1,666 @@ ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information -from time import time -from random import randrange +"""The main Khashmir program.""" + +import warnings +warnings.simplefilter("ignore", DeprecationWarning) + +from datetime import datetime, timedelta +from random import randrange, shuffle +from sha import sha import os -import sqlite ## find this at http://pysqlite.sourceforge.net/ from twisted.internet.defer import Deferred -from twisted.internet import protocol -from twisted.internet import reactor +from twisted.internet import protocol, reactor +from twisted.trial import unittest +from db import DB from ktable import KTable from knode import KNodeBase, KNodeRead, KNodeWrite, NULL_ID from khash import newID, newIDInRange -from actions import FindNode, GetValue, KeyExpirer, StoreValue +from actions import FindNode, FindValue, GetValue, StoreValue import krpc -class KhashmirDBExcept(Exception): - pass - -# this is the base class, has base functionality and find node, no key-value mappings class KhashmirBase(protocol.Factory): + """The base Khashmir class, with base functionality and find node, no key-value mappings. + + @type _Node: L{node.Node} + @ivar _Node: the knode implementation to use for this class of DHT + @type config: C{dictionary} + @ivar config: the configuration parameters for the DHT + @type port: C{int} + @ivar port: the port to listen on + @type store: L{db.DB} + @ivar store: the database to store nodes and key/value pairs in + @type node: L{node.Node} + @ivar node: this node + @type table: L{ktable.KTable} + @ivar table: the routing table + @type token_secrets: C{list} of C{string} + @ivar token_secrets: the current secrets to use to create tokens + @type udp: L{krpc.hostbroker} + @ivar udp: the factory for the KRPC protocol + @type listenport: L{twisted.internet.interfaces.IListeningPort} + @ivar listenport: the UDP listening port + @type next_checkpoint: L{twisted.internet.interfaces.IDelayedCall} + @ivar next_checkpoint: the delayed call for the next checkpoint + """ + _Node = KNodeBase + def __init__(self, config, cache_dir='/tmp'): + """Initialize the Khashmir class and call the L{setup} method. + + @type config: C{dictionary} + @param config: the configuration parameters for the DHT + @type cache_dir: C{string} + @param cache_dir: the directory to store all files in + (optional, defaults to the /tmp directory) + """ self.config = None self.setup(config, cache_dir) def setup(self, config, cache_dir): + """Setup all the Khashmir sub-modules. + + @type config: C{dictionary} + @param config: the configuration parameters for the DHT + @type cache_dir: C{string} + @param cache_dir: the directory to store all files in + """ self.config = config - self._findDB(os.path.join(cache_dir, 'khashmir.db')) self.port = config['PORT'] + self.store = DB(os.path.join(cache_dir, 'khashmir.' + str(self.port) + '.db')) self.node = self._loadSelfNode('', self.port) self.table = KTable(self.node, config) - #self.app = service.Application("krpc") - self.udp = krpc.hostbroker(self) + self.token_secrets = [newID()] + + # Start listening + self.udp = krpc.hostbroker(self, config) self.udp.protocol = krpc.KRPC - self.listenport = reactor.listenUDP(port, self.udp) - self.last = time() + self.listenport = reactor.listenUDP(self.port, self.udp) + + # Load the routing table and begin checkpointing self._loadRoutingTable() - KeyExpirer(self.store, config) - self.refreshTable(force=1) - reactor.callLater(60, self.checkpoint, (1,)) + self.refreshTable(force = True) + self.next_checkpoint = reactor.callLater(60, self.checkpoint) - def Node(self): - n = self._Node() + def Node(self, id, host = None, port = None): + """Create a new node. + + @see: L{node.Node.__init__} + """ + n = self._Node(id, host, port) n.table = self.table + n.conn = self.udp.connectionForAddr((n.host, n.port)) return n def __del__(self): + """Stop listening for packets.""" self.listenport.stopListening() def _loadSelfNode(self, host, port): - c = self.store.cursor() - c.execute('select id from self where num = 0;') - if c.rowcount > 0: - id = c.fetchone()[0] - else: + """Create this node, loading any previously saved one.""" + id = self.store.getSelfNode() + if not id: id = newID() - return self._Node().init(id, host, port) + return self._Node(id, host, port) - def _saveSelfNode(self): - c = self.store.cursor() - c.execute('delete from self where num = 0;') - c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id)) - self.store.commit() + def checkpoint(self): + """Perform some periodic maintenance operations.""" + # Create a new token secret + self.token_secrets.insert(0, newID()) + if len(self.token_secrets) > 3: + self.token_secrets.pop() + + # Save some parameters for reloading + self.store.saveSelfNode(self.node.id) + self.store.dumpRoutingTable(self.table.buckets) - def checkpoint(self, auto=0): - self._saveSelfNode() - self._dumpRoutingTable() + # DHT maintenance + self.store.expireValues(self.config['KEY_EXPIRE']) self.refreshTable() - if auto: - reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), - int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), - self.checkpoint, (1,)) - - def _findDB(self, db): - try: - os.stat(db) - except OSError: - self._createNewDB(db) - else: - self._loadDB(db) - def _loadDB(self, db): - try: - self.store = sqlite.connect(db=db) - #self.store.autocommit = 0 - except: - import traceback - raise KhashmirDBExcept, "Couldn't open DB", traceback.format_exc() - - def _createNewDB(self, db): - self.store = sqlite.connect(db=db) - s = """ - create table kv (key binary, value binary, time timestamp, primary key (key, value)); - create index kv_key on kv(key); - create index kv_timestamp on kv(time); - - create table nodes (id binary primary key, host text, port number); - - create table self (num number primary key, id binary); - """ - c = self.store.cursor() - c.execute(s) - self.store.commit() - - def _dumpRoutingTable(self): - """ - save routing table nodes to the database - """ - c = self.store.cursor() - c.execute("delete from nodes where id not NULL;") - for bucket in self.table.buckets: - for node in bucket.l: - c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port)) - self.store.commit() + self.next_checkpoint = reactor.callLater(randrange(int(self.config['CHECKPOINT_INTERVAL'] * .9), + int(self.config['CHECKPOINT_INTERVAL'] * 1.1)), + self.checkpoint) def _loadRoutingTable(self): + """Load the previous routing table nodes from the database. + + It's usually a good idea to call refreshTable(force = True) after + loading the table. """ - load routing table nodes from database - it's usually a good idea to call refreshTable(force=1) after loading the table - """ - c = self.store.cursor() - c.execute("select * from nodes;") - for rec in c.fetchall(): - n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.table.insertNode(n, contacted=0) + nodes = self.store.getRoutingTable() + for rec in nodes: + n = self.Node(rec[0], rec[1], int(rec[2])) + self.table.insertNode(n, contacted = False) - - ####### - ####### LOCAL INTERFACE - use these methods! - def addContact(self, host, port, callback=None): - """ - ping this node and add the contact info to the table on pong! + #{ Local interface + def addContact(self, host, port, callback=None, errback=None): + """Ping this node and add the contact info to the table on pong. + + @type host: C{string} + @param host: the IP address of the node to contact + @type port: C{int} + @param port:the port of the node to contact + @type callback: C{method} + @param callback: the method to call with the results, it must take 1 + parameter, the contact info returned by the node + (optional, defaults to doing nothing with the results) + @type errback: C{method} + @param errback: the method to call if an error occurs + (optional, defaults to calling the callback with None) """ - n =self.Node().init(NULL_ID, host, port) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.sendPing(n, callback=callback) + n = self.Node(NULL_ID, host, port) + self.sendJoin(n, callback=callback, errback=errback) - ## this call is async! def findNode(self, id, callback, errback=None): - """ returns the contact info for node, or the k closest nodes, from the global table """ - # get K nodes out of local table/cache, or the node we want + """Find the contact info for the K closest nodes in the global table. + + @type id: C{string} + @param id: the target ID to find the K closest nodes of + @type callback: C{method} + @param callback: the method to call with the results, it must take 1 + parameter, the list of K closest nodes + @type errback: C{method} + @param errback: the method to call if an error occurs + (optional, defaults to doing nothing when an error occurs) + """ + # Get K nodes out of local table/cache nodes = self.table.findNodes(id) d = Deferred() if errback: d.addCallbacks(callback, errback) else: d.addCallback(callback) - if len(nodes) == 1 and nodes[0].id == id : + + # If the target ID was found + if len(nodes) == 1 and nodes[0].id == id: d.callback(nodes) else: - # create our search state + # Start the finding nodes action state = FindNode(self, id, d.callback, self.config) reactor.callLater(0, state.goWithNodes, nodes) - def insertNode(self, n, contacted=1): - """ - insert a node in our local table, pinging oldest contact in bucket, if necessary + def insertNode(self, node, contacted = True): + """Try to insert a node in our local table, pinging oldest contact if necessary. - If all you have is a host/port, then use addContact, which calls this method after - receiving the PONG from the remote node. The reason for the seperation is we can't insert - a node into the table without it's peer-ID. That means of course the node passed into this - method needs to be a properly formed Node object with a valid ID. + If all you have is a host/port, then use L{addContact}, which calls this + method after receiving the PONG from the remote node. The reason for + the seperation is we can't insert a node into the table without its + node ID. That means of course the node passed into this method needs + to be a properly formed Node object with a valid ID. + + @type node: L{node.Node} + @param node: the new node to try and insert + @type contacted: C{boolean} + @param contacted: whether the new node is known to be good, i.e. + responded to a request (optional, defaults to True) """ - old = self.table.insertNode(n, contacted=contacted) - if old and (time() - old.lastSeen) > self.config['MIN_PING_INTERVAL'] and old.id != self.node.id: - # the bucket is full, check to see if old node is still around and if so, replace it + old = self.table.insertNode(node, contacted=contacted) + if (old and old.id != self.node.id and + (datetime.now() - old.lastSeen) > + timedelta(seconds=self.config['MIN_PING_INTERVAL'])): - ## these are the callbacks used when we ping the oldest node in a bucket - def _staleNodeHandler(oldnode=old, newnode = n): - """ called if the pinged node never responds """ - self.table.replaceStaleNode(old, newnode) + def _staleNodeHandler(oldnode = old, newnode = node): + """The pinged node never responded, so replace it.""" + self.table.replaceStaleNode(oldnode, newnode) def _notStaleNodeHandler(dict, old=old): - """ called when we get a pong from the old node """ + """Got a pong from the old node, so update it.""" dict = dict['rsp'] if dict['id'] == old.id: self.table.justSeenNode(old.id) + # Bucket is full, check to see if old node is still available df = old.ping(self.node.id) df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) - def sendPing(self, node, callback=None): - """ - ping a node + def sendJoin(self, node, callback=None, errback=None): + """Join the DHT by pinging a bootstrap node. + + @type node: L{node.Node} + @param node: the node to send the join to + @type callback: C{method} + @param callback: the method to call with the results, it must take 1 + parameter, the contact info returned by the node + (optional, defaults to doing nothing with the results) + @type errback: C{method} + @param errback: the method to call if an error occurs + (optional, defaults to calling the callback with None) """ - df = node.ping(self.node.id) - ## these are the callbacks we use when we issue a PING - def _pongHandler(dict, node=node, table=self.table, callback=callback): - _krpc_sender = dict['_krpc_sender'] - dict = dict['rsp'] - sender = {'id' : dict['id']} - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = self.Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - table.insertNode(n) + + def _pongHandler(dict, node=node, self=self, callback=callback): + """Node responded properly, callback with response.""" + n = self.Node(dict['rsp']['id'], dict['_krpc_sender'][0], dict['_krpc_sender'][1]) + self.insertNode(n) if callback: - callback() - def _defaultPong(err, node=node, table=self.table, callback=callback): + callback((dict['rsp']['ip_addr'], dict['rsp']['port'])) + + def _defaultPong(err, node=node, table=self.table, callback=callback, errback=errback): + """Error occurred, fail node and errback or callback with error.""" table.nodeFailed(node) - if callback: - callback() + if errback: + errback() + elif callback: + callback(None) - df.addCallbacks(_pongHandler,_defaultPong) + df = node.join(self.node.id) + df.addCallbacks(_pongHandler, _defaultPong) - def findCloseNodes(self, callback=lambda a: None): - """ - This does a findNode on the ID one away from our own. - This will allow us to populate our table with nodes on our network closest to our own. - This is called as soon as we start up with an empty table + def findCloseNodes(self, callback=lambda a: None, errback = None): + """Perform a findNode on the ID one away from our own. + + This will allow us to populate our table with nodes on our network + closest to our own. This is called as soon as we start up with an + empty table. + + @type callback: C{method} + @param callback: the method to call with the results, it must take 1 + parameter, the list of K closest nodes + (optional, defaults to doing nothing with the results) + @type errback: C{method} + @param errback: the method to call if an error occurs + (optional, defaults to doing nothing when an error occurs) """ id = self.node.id[:-1] + chr((ord(self.node.id[-1]) + 1) % 256) - self.findNode(id, callback) + self.findNode(id, callback, errback) - def refreshTable(self, force=0): - """ - force=1 will refresh table regardless of last bucket access time + def refreshTable(self, force = False): + """Check all the buckets for those that need refreshing. + + @param force: refresh all buckets regardless of last bucket access time + (optional, defaults to False) """ def callback(nodes): pass for bucket in self.table.buckets: - if force or (time() - bucket.lastAccessed >= self.config['BUCKET_STALENESS']): + if force or (datetime.now() - bucket.lastAccessed > + timedelta(seconds=self.config['BUCKET_STALENESS'])): + # Choose a random ID in the bucket and try and find it id = newIDInRange(bucket.min, bucket.max) self.findNode(id, callback) def stats(self): - """ - Returns (num_contacts, num_nodes) - num_contacts: number contacts in our routing table - num_nodes: number of nodes estimated in the entire dht + """Collect some statistics about the DHT. + + @rtype: (C{int}, C{int}) + @return: the number contacts in our routing table, and the estimated + number of nodes in the entire DHT """ num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0) num_nodes = self.config['K'] * (2**(len(self.table.buckets) - 1)) return (num_contacts, num_nodes) + + def shutdown(self): + """Closes the port and cancels pending later calls.""" + self.listenport.stopListening() + try: + self.next_checkpoint.cancel() + except: + pass + self.store.close() + #{ Remote interface def krpc_ping(self, id, _krpc_sender): - sender = {'id' : id} - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = self.Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.insertNode(n, contacted=0) + """Pong with our ID. + + @type id: C{string} + @param id: the node ID of the sender node + @type _krpc_sender: (C{string}, C{int}) + @param _krpc_sender: the sender node's IP address and port + """ + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted = False) + return {"id" : self.node.id} + def krpc_join(self, id, _krpc_sender): + """Add the node by responding with its address and port. + + @type id: C{string} + @param id: the node ID of the sender node + @type _krpc_sender: (C{string}, C{int}) + @param _krpc_sender: the sender node's IP address and port + """ + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted = False) + + return {"ip_addr" : _krpc_sender[0], "port" : _krpc_sender[1], "id" : self.node.id} + def krpc_find_node(self, target, id, _krpc_sender): + """Find the K closest nodes to the target in the local routing table. + + @type target: C{string} + @param target: the target ID to find nodes for + @type id: C{string} + @param id: the node ID of the sender node + @type _krpc_sender: (C{string}, C{int}) + @param _krpc_sender: the sender node's IP address and port + """ + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted = False) + nodes = self.table.findNodes(target) - nodes = map(lambda node: node.senderDict(), nodes) - sender = {'id' : id} - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = self.Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.insertNode(n, contacted=0) - return {"nodes" : nodes, "id" : self.node.id} + nodes = map(lambda node: node.contactInfo(), nodes) + token = sha(self.token_secrets[0] + _krpc_sender[0]).digest() + return {"nodes" : nodes, "token" : token, "id" : self.node.id} -## This class provides read-only access to the DHT, valueForKey -## you probably want to use this mixin and provide your own write methods class KhashmirRead(KhashmirBase): + """The read-only Khashmir class, which can only retrieve (not store) key/value mappings.""" + _Node = KNodeRead - def retrieveValues(self, key): - c = self.store.cursor() - c.execute("select value from kv where key = %s;", sqlite.encode(key)) - t = c.fetchone() - l = [] - while t: - l.append(t['value']) - t = c.fetchone() - return l - ## also async - def valueForKey(self, key, callback, searchlocal = 1): - """ returns the values found for key in global table - callback will be called with a list of values for each peer that returns unique values - final callback will be an empty list - probably should change to 'more coming' arg + + #{ Local interface + def findValue(self, key, callback, errback=None): + """Get the nodes that have values for the key from the global table. + + @type key: C{string} + @param key: the target key to find the values for + @type callback: C{method} + @param callback: the method to call with the results, it must take 1 + parameter, the list of nodes with values + @type errback: C{method} + @param errback: the method to call if an error occurs + (optional, defaults to doing nothing when an error occurs) """ + # Get K nodes out of local table/cache nodes = self.table.findNodes(key) + d = Deferred() + if errback: + d.addCallbacks(callback, errback) + else: + d.addCallback(callback) + + # Search for others starting with the locally found ones + state = FindValue(self, key, d.callback, self.config) + reactor.callLater(0, state.goWithNodes, nodes) + + def valueForKey(self, key, callback, searchlocal = True): + """Get the values found for key in global table. - # get locals + Callback will be called with a list of values for each peer that + returns unique values. The final callback will be an empty list. + + @type key: C{string} + @param key: the target key to get the values for + @type callback: C{method} + @param callback: the method to call with the results, it must take 2 + parameters: the key, and the values found + @type searchlocal: C{boolean} + @param searchlocal: whether to also look for any local values + """ + # Get any local values if searchlocal: - l = self.retrieveValues(key) + l = self.store.retrieveValues(key) if len(l) > 0: - reactor.callLater(0, callback, (l)) + reactor.callLater(0, callback, key, l) else: l = [] - - # create our search state - state = GetValue(self, key, callback, self.config) - reactor.callLater(0, state.goWithNodes, nodes, l) + def _getValueForKey(nodes, key=key, local_values=l, response=callback, self=self): + """Use the found nodes to send requests for values to.""" + state = GetValue(self, key, local_values, self.config['RETRIEVE_VALUES'], response, self.config) + reactor.callLater(0, state.goWithNodes, nodes) + + # First lookup nodes that have values for the key + self.findValue(key, _getValueForKey) + + #{ Remote interface def krpc_find_value(self, key, id, _krpc_sender): - sender = {'id' : id} - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = self.Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.insertNode(n, contacted=0) + """Find the number of values stored locally for the key, and the K closest nodes. + + @type key: C{string} + @param key: the target key to find the values and nodes for + @type id: C{string} + @param id: the node ID of the sender node + @type _krpc_sender: (C{string}, C{int}) + @param _krpc_sender: the sender node's IP address and port + """ + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted = False) + + nodes = self.table.findNodes(key) + nodes = map(lambda node: node.contactInfo(), nodes) + num_values = self.store.countValues(key) + return {'nodes' : nodes, 'num' : num_values, "id": self.node.id} + + def krpc_get_value(self, key, num, id, _krpc_sender): + """Retrieve the values stored locally for the key. + + @type key: C{string} + @param key: the target key to retrieve the values for + @type num: C{int} + @param num: the maximum number of values to retrieve, or 0 to + retrieve all of them + @type id: C{string} + @param id: the node ID of the sender node + @type _krpc_sender: (C{string}, C{int}) + @param _krpc_sender: the sender node's IP address and port + """ + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted = False) - l = self.retrieveValues(key) - if len(l) > 0: + l = self.store.retrieveValues(key) + if num == 0 or num >= len(l): return {'values' : l, "id": self.node.id} else: - nodes = self.table.findNodes(key) - nodes = map(lambda node: node.senderDict(), nodes) - return {'nodes' : nodes, "id": self.node.id} + shuffle(l) + return {'values' : l[:num], "id": self.node.id} + -### provides a generic write method, you probably don't want to deploy something that allows -### arbitrary value storage class KhashmirWrite(KhashmirRead): + """The read-write Khashmir class, which can store and retrieve key/value mappings.""" + _Node = KNodeWrite - ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) + + #{ Local interface def storeValueForKey(self, key, value, callback=None): - """ stores the value for key in the global table, returns immediately, no status - in this implementation, peers respond but don't indicate status to storing values - a key can have many values + """Stores the value for the key in the global table. + + No status in this implementation, peers respond but don't indicate + status of storing values. + + @type key: C{string} + @param key: the target key to store the value for + @type value: C{string} + @param value: the value to store with the key + @type callback: C{method} + @param callback: the method to call with the results, it must take 3 + parameters: the key, the value stored, and the result of the store + (optional, defaults to doing nothing with the results) """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + def _storeValueForKey(nodes, key=key, value=value, response=callback, self=self): + """Use the returned K closest nodes to store the key at.""" if not response: - # default callback - def _storedValueHandler(sender): + def _storedValueHandler(key, value, sender): + """Default callback that does nothing.""" pass - response=_storedValueHandler - action = StoreValue(self.table, key, value, response, self.config) + response = _storedValueHandler + action = StoreValue(self, key, value, self.config['STORE_REDUNDANCY'], response, self.config) reactor.callLater(0, action.goWithNodes, nodes) - # this call is asynch + # First find the K closest nodes to operate on. self.findNode(key, _storeValueForKey) - def krpc_store_value(self, key, value, id, _krpc_sender): - t = "%0.6f" % time() - c = self.store.cursor() - try: - c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t)) - except sqlite.IntegrityError, reason: - # update last insert time - c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) - self.store.commit() - sender = {'id' : id} - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = self.Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.insertNode(n, contacted=0) - return {"id" : self.node.id} + #{ Remote interface + def krpc_store_value(self, key, value, token, id, _krpc_sender): + """Store the value locally with the key. + + @type key: C{string} + @param key: the target key to store the value for + @type value: C{string} + @param value: the value to store with the key + @param token: the token to confirm that this peer contacted us previously + @type id: C{string} + @param id: the node ID of the sender node + @type _krpc_sender: (C{string}, C{int}) + @param _krpc_sender: the sender node's IP address and port + """ + n = self.Node(id, _krpc_sender[0], _krpc_sender[1]) + self.insertNode(n, contacted = False) + for secret in self.token_secrets: + this_token = sha(secret + _krpc_sender[0]).digest() + if token == this_token: + self.store.storeValue(key, value) + return {"id" : self.node.id} + raise krpc.KrpcError, (krpc.KRPC_ERROR_INVALID_TOKEN, 'token is invalid, do a find_nodes to get a fresh one') + -# the whole shebang, for testing class Khashmir(KhashmirWrite): + """The default Khashmir class (currently the read-write L{KhashmirWrite}).""" _Node = KNodeWrite + + +class SimpleTests(unittest.TestCase): + + timeout = 10 + DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, + 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, + 'MAX_FAILURES': 3, + 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, + 'KEY_EXPIRE': 3600, 'SPEW': False, } + + def setUp(self): + d = self.DHT_DEFAULTS.copy() + d['PORT'] = 4044 + self.a = Khashmir(d) + d = self.DHT_DEFAULTS.copy() + d['PORT'] = 4045 + self.b = Khashmir(d) + + def tearDown(self): + self.a.shutdown() + self.b.shutdown() + os.unlink(self.a.store.db) + os.unlink(self.b.store.db) + + def testAddContact(self): + self.failUnlessEqual(len(self.a.table.buckets), 1) + self.failUnlessEqual(len(self.a.table.buckets[0].l), 0) + + self.failUnlessEqual(len(self.b.table.buckets), 1) + self.failUnlessEqual(len(self.b.table.buckets[0].l), 0) + + self.a.addContact('127.0.0.1', 4045) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + + self.failUnlessEqual(len(self.a.table.buckets), 1) + self.failUnlessEqual(len(self.a.table.buckets[0].l), 1) + self.failUnlessEqual(len(self.b.table.buckets), 1) + self.failUnlessEqual(len(self.b.table.buckets[0].l), 1) + + def testStoreRetrieve(self): + self.a.addContact('127.0.0.1', 4045) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.got = 0 + self.a.storeValueForKey(sha('foo').digest(), 'foobar') + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + self.a.valueForKey(sha('foo').digest(), self._cb) + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + reactor.iterate() + + def _cb(self, key, val): + if not val: + self.failUnlessEqual(self.got, 1) + elif 'foobar' in val: + self.got = 1 + + +class MultiTest(unittest.TestCase): + + timeout = 30 + num = 20 + DHT_DEFAULTS = {'PORT': 9977, 'K': 8, 'HASH_LENGTH': 160, + 'CHECKPOINT_INTERVAL': 300, 'CONCURRENT_REQS': 4, + 'STORE_REDUNDANCY': 3, 'RETRIEVE_VALUES': -10000, + 'MAX_FAILURES': 3, + 'MIN_PING_INTERVAL': 900,'BUCKET_STALENESS': 3600, + 'KEY_EXPIRE': 3600, 'SPEW': False, } + + def _done(self, val): + self.done = 1 + + def setUp(self): + self.l = [] + self.startport = 4088 + for i in range(self.num): + d = self.DHT_DEFAULTS.copy() + d['PORT'] = self.startport + i + self.l.append(Khashmir(d)) + reactor.iterate() + reactor.iterate() + + for i in self.l: + i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) + i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) + i.addContact('127.0.0.1', self.l[randrange(0,self.num)].port) + reactor.iterate() + reactor.iterate() + reactor.iterate() + + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + reactor.iterate() + for i in self.l: + self.done = 0 + i.findCloseNodes(self._done) + while not self.done: + reactor.iterate() + + def tearDown(self): + for i in self.l: + i.shutdown() + os.unlink(i.store.db) + + reactor.iterate() + + def testStoreRetrieve(self): + for i in range(10): + K = newID() + V = newID() + + for a in range(3): + self.done = 0 + def _scb(key, value, result): + self.done = 1 + self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb) + while not self.done: + reactor.iterate() + + + def _rcb(key, val): + if not val: + self.done = 1 + self.failUnlessEqual(self.got, 1) + elif V in val: + self.got = 1 + for x in range(3): + self.got = 0 + self.done = 0 + self.l[randrange(0, self.num)].valueForKey(K, _rcb) + while not self.done: + reactor.iterate()