From 06adc6b3734c1a9a243c8a0a1172b09170bfddcc Mon Sep 17 00:00:00 2001 From: burris Date: Thu, 8 Jul 2004 04:02:59 +0000 Subject: [PATCH] broke up khashmir class into base, read, and write classes, since having generic read/write methods on deployed DHTs is not a great idea actions now take an "action" method, since it's not always "storeValueForKey" buckets now get refreshed nodes that change their ID are now handled properly --- actions.py | 58 +++++++----- const.py | 2 +- khashmir.py | 231 +++++++++++++++++++++++------------------------ knode.py | 21 +++-- test_khashmir.py | 29 +----- 5 files changed, 164 insertions(+), 177 deletions(-) diff --git a/actions.py b/actions.py index 6527d2c..e25c82b 100644 --- a/actions.py +++ b/actions.py @@ -7,7 +7,6 @@ from const import reactor import const from khash import intify -from knode import KNode as Node from ktable import KTable, K class ActionBase: @@ -49,7 +48,7 @@ class FindNode(ActionBase): sender = {'id' : dict["id"]} sender['port'] = _krpc_sender[1] sender['host'] = _krpc_sender[0] - sender = Node().initWithDict(sender) + sender = self.table.Node().initWithDict(sender) sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): @@ -58,7 +57,7 @@ class FindNode(ActionBase): self.outstanding = self.outstanding - 1 self.answered[sender.id] = 1 for node in l: - n = Node().initWithDict(node) + n = self.table.Node().initWithDict(node) n.conn = self.table.udp.connectionForAddr((n.host, n.port)) if not self.found.has_key(n.id): self.found[n.id] = n @@ -92,7 +91,7 @@ class FindNode(ActionBase): def makeMsgFailed(self, node): def defaultGotNodes(err, self=self, node=node): - print ">>> find failed %s/%s" % (node.host, node.port) + print ">>> find failed %s/%s" % (node.host, node.port), err self.table.table.nodeFailed(node) self.outstanding = self.outstanding - 1 self.schedule() @@ -112,8 +111,12 @@ class FindNode(ActionBase): self.schedule() -GET_VALUE_TIMEOUT = 15 +get_value_timeout = 15 class GetValue(FindNode): + def __init__(self, table, target, callback, find="findValue"): + FindNode.__init__(self, table, target, callback) + self.findValue = find + """ get value task """ def handleGotNodes(self, dict): _krpc_sender = dict['_krpc_sender'] @@ -121,7 +124,7 @@ class GetValue(FindNode): sender = {'id' : dict["id"]} sender['port'] = _krpc_sender[1] sender['host'] = _krpc_sender[0] - sender = Node().initWithDict(sender) + sender = self.table.Node().initWithDict(sender) sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): @@ -133,7 +136,7 @@ class GetValue(FindNode): # if we have any closer than what we already got, query them if dict.has_key('nodes'): for node in dict['nodes']: - n = Node().initWithDict(node) + n = self.table.Node().initWithDict(node) n.conn = self.table.udp.connectionForAddr((n.host, n.port)) if not self.found.has_key(n.id): self.found[n.id] = n @@ -160,11 +163,16 @@ class GetValue(FindNode): for node in l[:K]: if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT - df = node.findValue(self.target, self.table.node.id) - df.addCallback(self.handleGotNodes) - df.addErrback(self.makeMsgFailed(node)) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 + try: + f = getattr(node, self.findValue) + except AttributeError: + print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue) + else: + df = f(self.target, self.table.node.id) + df.addCallback(self.handleGotNodes) + df.addErrback(self.makeMsgFailed(node)) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 if self.outstanding >= const.CONCURRENT_REQS: break assert(self.outstanding) >=0 @@ -189,11 +197,12 @@ class GetValue(FindNode): class StoreValue(ActionBase): - def __init__(self, table, target, value, callback): + def __init__(self, table, target, value, callback, store="storeValue"): ActionBase.__init__(self, table, target, callback) self.value = value self.stored = [] - + self.store = store + def storedValue(self, t, node): self.outstanding -= 1 self.table.insertNode(node) @@ -206,15 +215,17 @@ class StoreValue(ActionBase): else: if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY: self.schedule() - + return t + def storeFailed(self, t, node): print ">>> store failed %s/%s" % (node.host, node.port) self.table.nodeFailed(node) self.outstanding -= 1 if self.finished: - return + return t self.schedule() - + return t + def schedule(self): if self.finished: return @@ -231,13 +242,14 @@ class StoreValue(ActionBase): else: if not node.id == self.table.node.id: self.outstanding += 1 - if type(self.value) == type([]): - df = node.storeValues(self.target, self.value, self.table.node.id) + try: + f = getattr(node, self.store) + except AttributeError: + print ">>> %s doesn't have a %s method!" % (node, self.store) else: - df = node.storeValue(self.target, self.value, self.table.node.id) - - df.addCallback(self.storedValue, node=node) - df.addErrback(self.storeFailed, node=node) + df = f(self.target, self.value, self.table.node.id) + df.addCallback(self.storedValue, node=node) + df.addErrback(self.storeFailed, node=node) def goWithNodes(self, nodes): self.nodes = nodes diff --git a/const.py b/const.py index 42d060b..fc5d1c0 100644 --- a/const.py +++ b/const.py @@ -47,7 +47,7 @@ MAX_FAILURES = 3 MIN_PING_INTERVAL = 60 * 15 # fifteen minutes # refresh buckets that haven't been touched in this long -BUCKET_STALENESS = 60 # one hour +BUCKET_STALENESS = 60 * 60 # one hour ### KEY EXPIRER diff --git a/khashmir.py b/khashmir.py index d773a45..46946eb 100644 --- a/khashmir.py +++ b/khashmir.py @@ -9,7 +9,7 @@ import time from sha import sha from ktable import KTable, K -from knode import KNode as Node +from knode import * from khash import newID, newIDInRange @@ -24,14 +24,17 @@ from twisted.web import server threadable.init() import sys +from random import randrange + import sqlite ## find this at http://pysqlite.sourceforge.net/ class KhashmirDBExcept(Exception): pass -# this is the main class! -class Khashmir(protocol.Factory): +# this is the base class, has base functionality and find node, no key-value mappings +class KhashmirBase(protocol.Factory): __slots__ = ('listener', 'node', 'table', 'store', 'app', 'last', 'protocol') + _Node = KNodeBase def __init__(self, host, port, db='khashmir.db'): self.setup(host, port, db) @@ -40,16 +43,21 @@ class Khashmir(protocol.Factory): self.port = port self.node = self._loadSelfNode(host, port) self.table = KTable(self.node) - self.app = service.Application("krpc") + #self.app = service.Application("krpc") self.udp = krpc.hostbroker(self) self.udp.protocol = krpc.KRPC self.listenport = reactor.listenUDP(port, self.udp) self.last = time.time() self._loadRoutingTable() KeyExpirer(store=self.store) - #self.refreshTable(force=1) + self.refreshTable(force=1) reactor.callLater(60, self.checkpoint, (1,)) - + + def Node(self): + n = self._Node() + n.table = self.table + return n + def __del__(self): self.listenport.stopListening() @@ -60,21 +68,20 @@ class Khashmir(protocol.Factory): id = c.fetchone()[0] else: id = newID() - return Node().init(id, host, port) + return self._Node().init(id, host, port) def _saveSelfNode(self): - self.store.autocommit = 0 c = self.store.cursor() c.execute('delete from self where num = 0;') c.execute("insert into self values (0, %s);", sqlite.encode(self.node.id)) self.store.commit() - self.store.autocommit = 1 def checkpoint(self, auto=0): self._saveSelfNode() self._dumpRoutingTable() + self.refreshTable() if auto: - reactor.callLater(const.CHECKPOINT_INTERVAL, self.checkpoint) + reactor.callLater(randrange(int(const.CHECKPOINT_INTERVAL * .9), int(const.CHECKPOINT_INTERVAL * 1.1)), self.checkpoint, (1,)) def _findDB(self, db): import os @@ -88,14 +95,13 @@ class Khashmir(protocol.Factory): def _loadDB(self, db): try: self.store = sqlite.connect(db=db) - self.store.autocommit = 1 + #self.store.autocommit = 0 except: import traceback raise KhashmirDBExcept, "Couldn't open DB", traceback.exc_traceback def _createNewDB(self, db): self.store = sqlite.connect(db=db) - self.store.autocommit = 1 s = """ create table kv (key binary, value binary, time timestamp, primary key (key, value)); create index kv_key on kv(key); @@ -107,19 +113,18 @@ class Khashmir(protocol.Factory): """ c = self.store.cursor() c.execute(s) + self.store.commit() def _dumpRoutingTable(self): """ save routing table nodes to the database """ - self.store.autocommit = 0; c = self.store.cursor() c.execute("delete from nodes where id not NULL;") for bucket in self.table.buckets: for node in bucket.l: c.execute("insert into nodes values (%s, %s, %s);", (sqlite.encode(node.id), node.host, node.port)) self.store.commit() - self.store.autocommit = 1; def _loadRoutingTable(self): """ @@ -129,7 +134,7 @@ class Khashmir(protocol.Factory): c = self.store.cursor() c.execute("select * from nodes;") for rec in c.fetchall(): - n = Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) + n = self.Node().initWithDict({'id':rec[0], 'host':rec[1], 'port':int(rec[2])}) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.table.insertNode(n, contacted=0) @@ -140,7 +145,7 @@ class Khashmir(protocol.Factory): """ ping this node and add the contact info to the table on pong! """ - n =Node().init(const.NULL_ID, host, port) + n =self.Node().init(const.NULL_ID, host, port) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.sendPing(n, callback=callback) @@ -161,46 +166,6 @@ class Khashmir(protocol.Factory): state = FindNode(self, id, d.callback) reactor.callFromThread(state.goWithNodes, nodes) - - ## also async - def valueForKey(self, key, callback, searchlocal = 1): - """ returns the values found for key in global table - callback will be called with a list of values for each peer that returns unique values - final callback will be an empty list - probably should change to 'more coming' arg - """ - nodes = self.table.findNodes(key) - - # get locals - if searchlocal: - l = self.retrieveValues(key) - if len(l) > 0: - reactor.callLater(0, callback, (l)) - else: - l = [] - - # create our search state - state = GetValue(self, key, callback) - reactor.callFromThread(state.goWithNodes, nodes, l) - - ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) - def storeValueForKey(self, key, value, callback=None): - """ stores the value for key in the global table, returns immediately, no status - in this implementation, peers respond but don't indicate status to storing values - a key can have many values - """ - def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): - if not response: - # default callback - def _storedValueHandler(sender): - pass - response=_storedValueHandler - action = StoreValue(self.table, key, value, response) - reactor.callFromThread(action.goWithNodes, nodes) - - # this call is asynch - self.findNode(key, _storeValueForKey) - - def insertNode(self, n, contacted=1): """ insert a node in our local table, pinging oldest contact in bucket, if necessary @@ -238,17 +203,13 @@ class Khashmir(protocol.Factory): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] sender = {'id' : dict['id']} - if node.id != const.NULL_ID and node.id != sender['id']: - # whoah, got response from different peer than we were expecting - self.table.invalidateNode(node) - else: - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - table.insertNode(n) - if callback: - callback() + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + table.insertNode(n) + if callback: + callback() def _defaultPong(err, node=node, table=self.table, callback=callback): table.nodeFailed(node) if callback: @@ -277,25 +238,21 @@ class Khashmir(protocol.Factory): id = newIDInRange(bucket.min, bucket.max) self.findNode(id, callback) + def stats(self): + """ + Returns (num_contacts, num_nodes) + num_contacts: number contacts in our routing table + num_nodes: number of nodes estimated in the entire dht + """ + num_contacts = reduce(lambda a, b: a + len(b.l), self.table.buckets, 0) + num_nodes = const.K * (2**(len(self.table.buckets) - 1)) + return (num_contacts, num_nodes) - def retrieveValues(self, key): - c = self.store.cursor() - c.execute("select value from kv where key = %s;", sqlite.encode(key)) - t = c.fetchone() - l = [] - while t: - l.append(t['value']) - t = c.fetchone() - return l - - ##### - ##### INCOMING MESSAGE HANDLERS - def krpc_ping(self, id, _krpc_sender): sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] - n = Node().initWithDict(sender) + n = self.Node().initWithDict(sender) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) return {"id" : self.node.id} @@ -306,52 +263,50 @@ class Khashmir(protocol.Factory): sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] - n = Node().initWithDict(sender) + n = self.Node().initWithDict(sender) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) return {"nodes" : nodes, "id" : self.node.id} - - def krpc_store_value(self, key, value, id, _krpc_sender): - t = "%0.6f" % time.time() - c = self.store.cursor() - try: - c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t)) - except sqlite.IntegrityError, reason: - # update last insert time - c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) - sender = {'id' : id} - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.insertNode(n, contacted=0) - return {"id" : self.node.id} - ## multiple values per key - def krpc_store_values(self, key, values, id, _krpc_sender): - t = "%0.6f" % time.time() + +## This class provides read-only access to the DHT, valueForKey +## you probably want to use this mixin and provide your own write methods +class KhashmirRead(KhashmirBase): + _Node = KNodeRead + def retrieveValues(self, key): c = self.store.cursor() - key = sqlite.encode(key) - for value in values: - value = sqlite.encode(value) - try: - c.execute("insert into kv values (%s, %s, %s);", key, value, t) - except sqlite.IntegrityError, reason: - # update last insert time - c.execute("update kv set time = %s where key = %s and value = %s;", (t, key, value)) - sender = {'id' : id} - sender['host'] = _krpc_sender[0] - sender['port'] = _krpc_sender[1] - n = Node().initWithDict(sender) - n.conn = self.udp.connectionForAddr((n.host, n.port)) - self.insertNode(n, contacted=0) - return {"id" : self.node.id} + c.execute("select value from kv where key = %s;", sqlite.encode(key)) + t = c.fetchone() + l = [] + while t: + l.append(t['value']) + t = c.fetchone() + return l + ## also async + def valueForKey(self, key, callback, searchlocal = 1): + """ returns the values found for key in global table + callback will be called with a list of values for each peer that returns unique values + final callback will be an empty list - probably should change to 'more coming' arg + """ + nodes = self.table.findNodes(key) + + # get locals + if searchlocal: + l = self.retrieveValues(key) + if len(l) > 0: + reactor.callLater(0, callback, (l)) + else: + l = [] + + # create our search state + state = GetValue(self, key, callback) + reactor.callFromThread(state.goWithNodes, nodes, l) def krpc_find_value(self, key, id, _krpc_sender): sender = {'id' : id} sender['host'] = _krpc_sender[0] sender['port'] = _krpc_sender[1] - n = Node().initWithDict(sender) + n = self.Node().initWithDict(sender) n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) @@ -363,3 +318,45 @@ class Khashmir(protocol.Factory): nodes = map(lambda node: node.senderDict(), nodes) return {'nodes' : nodes, "id": self.node.id} +### provides a generic write method, you probably don't want to deploy something that allows +### arbitrary value storage +class KhashmirWrite(KhashmirRead): + _Node = KNodeWrite + ## async, callback indicates nodes we got a response from (but no guarantee they didn't drop it on the floor) + def storeValueForKey(self, key, value, callback=None): + """ stores the value for key in the global table, returns immediately, no status + in this implementation, peers respond but don't indicate status to storing values + a key can have many values + """ + def _storeValueForKey(nodes, key=key, value=value, response=callback , table=self.table): + if not response: + # default callback + def _storedValueHandler(sender): + pass + response=_storedValueHandler + action = StoreValue(self.table, key, value, response) + reactor.callFromThread(action.goWithNodes, nodes) + + # this call is asynch + self.findNode(key, _storeValueForKey) + + def krpc_store_value(self, key, value, id, _krpc_sender): + t = "%0.6f" % time.time() + c = self.store.cursor() + try: + c.execute("insert into kv values (%s, %s, %s);", (sqlite.encode(key), sqlite.encode(value), t)) + except sqlite.IntegrityError, reason: + # update last insert time + c.execute("update kv set time = %s where key = %s and value = %s;", (t, sqlite.encode(key), sqlite.encode(value))) + self.store.commit() + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = self.Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"id" : self.node.id} + +# the whole shebang, for testing +class Khashmir(KhashmirWrite): + _Node = KNodeWrite diff --git a/knode.py b/knode.py index 44341ae..46ab780 100644 --- a/knode.py +++ b/knode.py @@ -1,4 +1,4 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information from node import Node @@ -10,7 +10,7 @@ class IDChecker: def __init__(id): self.id = id -class KNode(Node): +class KNodeBase(Node): def checkSender(self, dict): try: senderid = dict['rsp']['id'] @@ -20,7 +20,8 @@ class KNode(Node): else: if self.id != NULL_ID and senderid != self.id: print "Got response from different node than expected." - raise Exception, "Got response from different node than expected." + self.table.invalidateNode(self) + return dict def errBack(self, err): @@ -37,6 +38,15 @@ class KNode(Node): df.addErrback(self.errBack) df.addCallback(self.checkSender) return df + +class KNodeRead(KNodeBase): + def findValue(self, key, id): + df = self.conn.sendRequest('find_value', {"key" : key, "id" : id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df + +class KNodeWrite(KNodeRead): def storeValue(self, key, value, id): df = self.conn.sendRequest('store_value', {"key" : key, "value" : value, "id": id}) df.addErrback(self.errBack) @@ -47,8 +57,3 @@ class KNode(Node): df.addErrback(self.errBack) df.addCallback(self.checkSender) return df - def findValue(self, key, id): - df = self.conn.sendRequest('find_value', {"key" : key, "id" : id}) - df.addErrback(self.errBack) - df.addCallback(self.checkSender) - return df diff --git a/test_khashmir.py b/test_khashmir.py index 83ddde3..3f4908f 100644 --- a/test_khashmir.py +++ b/test_khashmir.py @@ -78,7 +78,7 @@ class MultiTest(TestCase): def setUp(self): self.l = [] - self.startport = 4044 + self.startport = 4088 for i in range(self.num): self.l.append(Khashmir('127.0.0.1', self.startport + i, '/tmp/%s.test' % (self.startport + i))) reactor.iterate() @@ -139,33 +139,6 @@ class MultiTest(TestCase): reactor.iterate() - K = khash.newID() - l = map(lambda a: newID(), range(8)) - for a in range(3): - self.done = 0 - def _scb(val): - self.done = 1 - self.l[randrange(0, self.num)].storeValueForKey(K, l, _scb) - while not self.done: - reactor.iterate() - - - c = [] - def _rcb(val): - if not val: - self.done = 1 - self.assertEqual(self.got, 1) - for n in val: - c.remove(n) - if not c: - self.got = 1 - for x in range(3): - self.got = 0 - self.done = 0 - c = copy(l) - self.l[randrange(0, self.num)].valueForKey(K, _rcb) - while not self.done: - reactor.iterate() -- 2.39.5