From 2c1a51b121f07cc7d629eeeb3fe728b72660649f Mon Sep 17 00:00:00 2001 From: burris Date: Tue, 15 Jun 2004 16:57:05 +0000 Subject: [PATCH] now if you provide a list to storeValueForKey it will use the krpc method "store_values" to store multiple values under the same key in one swoop --- actions.py | 8 ++++-- const.py | 4 +-- khashmir.py | 24 +++++++++++++++-- knode.py | 5 ++++ krpc.py | 2 +- test_khashmir.py | 70 +++++++++++++++++++++++++++++++++++------------- 6 files changed, 88 insertions(+), 25 deletions(-) diff --git a/actions.py b/actions.py index 1afdeb1..6527d2c 100644 --- a/actions.py +++ b/actions.py @@ -1,4 +1,4 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information from time import time @@ -231,7 +231,11 @@ class StoreValue(ActionBase): else: if not node.id == self.table.node.id: self.outstanding += 1 - df = node.storeValue(self.target, self.value, self.table.node.id) + if type(self.value) == type([]): + df = node.storeValues(self.target, self.value, self.table.node.id) + else: + df = node.storeValue(self.target, self.value, self.table.node.id) + df.addCallback(self.storedValue, node=node) df.addErrback(self.storeFailed, node=node) diff --git a/const.py b/const.py index 1a42485..42d060b 100644 --- a/const.py +++ b/const.py @@ -55,7 +55,7 @@ BUCKET_STALENESS = 60 # one hour KEINITIAL_DELAY = 15 # 15 seconds - to clean out old stuff in persistent db # time between expirer runs -KE_DELAY = 60 * 60 # 1 hour +KE_DELAY = 60 * 20 # 20 minutes # expire entries older than this -KE_AGE = 60 * 60 * 24 # 24 hours +KE_AGE = 60 * 60 # 60 minutes diff --git a/khashmir.py b/khashmir.py index 92619fd..d773a45 100644 --- a/khashmir.py +++ b/khashmir.py @@ -1,4 +1,4 @@ -## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved +## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved # see LICENSE.txt for license information from const import reactor @@ -326,7 +326,27 @@ class Khashmir(protocol.Factory): n.conn = self.udp.connectionForAddr((n.host, n.port)) self.insertNode(n, contacted=0) return {"id" : self.node.id} - + + ## multiple values per key + def krpc_store_values(self, key, values, id, _krpc_sender): + t = "%0.6f" % time.time() + c = self.store.cursor() + key = sqlite.encode(key) + for value in values: + value = sqlite.encode(value) + try: + c.execute("insert into kv values (%s, %s, %s);", key, value, t) + except sqlite.IntegrityError, reason: + # update last insert time + c.execute("update kv set time = %s where key = %s and value = %s;", (t, key, value)) + sender = {'id' : id} + sender['host'] = _krpc_sender[0] + sender['port'] = _krpc_sender[1] + n = Node().initWithDict(sender) + n.conn = self.udp.connectionForAddr((n.host, n.port)) + self.insertNode(n, contacted=0) + return {"id" : self.node.id} + def krpc_find_value(self, key, id, _krpc_sender): sender = {'id' : id} sender['host'] = _krpc_sender[0] diff --git a/knode.py b/knode.py index 791e725..44341ae 100644 --- a/knode.py +++ b/knode.py @@ -42,6 +42,11 @@ class KNode(Node): df.addErrback(self.errBack) df.addCallback(self.checkSender) return df + def storeValues(self, key, value, id): + df = self.conn.sendRequest('store_values', {"key" : key, "values" : value, "id": id}) + df.addErrback(self.errBack) + df.addCallback(self.checkSender) + return df def findValue(self, key, id): df = self.conn.sendRequest('find_value', {"key" : key, "id" : id}) df.addErrback(self.errBack) diff --git a/krpc.py b/krpc.py index 1cf33d5..efdc29f 100644 --- a/krpc.py +++ b/krpc.py @@ -15,7 +15,7 @@ from traceback import format_exception import khash as hash -KRPC_TIMEOUT = 60 +KRPC_TIMEOUT = 20 KRPC_ERROR = 1 KRPC_ERROR_METHOD_UNKNOWN = 2 diff --git a/test_khashmir.py b/test_khashmir.py index d0dfc82..83ddde3 100644 --- a/test_khashmir.py +++ b/test_khashmir.py @@ -1,6 +1,7 @@ from unittest import * from khashmir import * import khash +from copy import copy from whrandom import randrange @@ -71,7 +72,7 @@ class SimpleTests(TestCase): class MultiTest(TestCase): - num = 5 + num = 20 def _done(self, val): self.done = 1 @@ -115,23 +116,56 @@ class MultiTest(TestCase): K = khash.newID() V = khash.newID() - self.done = 0 - def _cb(val): - self.done = 1 - self.l[randrange(0, self.num)].storeValueForKey(K, V, _cb) - while not self.done: - reactor.iterate() + for a in range(3): + self.done = 0 + def _scb(val): + self.done = 1 + self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb) + while not self.done: + reactor.iterate() + + + def _rcb(val): + if not val: + self.done = 1 + self.assertEqual(self.got, 1) + elif V in val: + self.got = 1 + for x in range(3): + self.got = 0 + self.done = 0 + self.l[randrange(0, self.num)].valueForKey(K, _rcb) + while not self.done: + reactor.iterate() - self.got = 0 - self.done = 0 - def _cb(val): - if not val: + K = khash.newID() + l = map(lambda a: newID(), range(8)) + for a in range(3): + self.done = 0 + def _scb(val): self.done = 1 - self.assertEqual(self.got, 1) - elif V in val: - self.got = 1 - - self.l[randrange(0, self.num)].valueForKey(K, _cb) - while not self.done: - reactor.iterate() + self.l[randrange(0, self.num)].storeValueForKey(K, l, _scb) + while not self.done: + reactor.iterate() + + + c = [] + def _rcb(val): + if not val: + self.done = 1 + self.assertEqual(self.got, 1) + for n in val: + c.remove(n) + if not c: + self.got = 1 + for x in range(3): + self.got = 0 + self.done = 0 + c = copy(l) + self.l[randrange(0, self.num)].valueForKey(K, _rcb) + while not self.done: + reactor.iterate() + + + -- 2.39.5