-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
# see LICENSE.txt for license information
from time import time
else:
if not node.id == self.table.node.id:
self.outstanding += 1
- df = node.storeValue(self.target, self.value, self.table.node.id)
+ if type(self.value) == type([]):
+ df = node.storeValues(self.target, self.value, self.table.node.id)
+ else:
+ df = node.storeValue(self.target, self.value, self.table.node.id)
+
df.addCallback(self.storedValue, node=node)
df.addErrback(self.storeFailed, node=node)
KEINITIAL_DELAY = 15 # 15 seconds - to clean out old stuff in persistent db
# time between expirer runs
-KE_DELAY = 60 * 60 # 1 hour
+KE_DELAY = 60 * 20 # 20 minutes
# expire entries older than this
-KE_AGE = 60 * 60 * 24 # 24 hours
+KE_AGE = 60 * 60 # 60 minutes
-## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
+## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
# see LICENSE.txt for license information
from const import reactor
n.conn = self.udp.connectionForAddr((n.host, n.port))
self.insertNode(n, contacted=0)
return {"id" : self.node.id}
-
+
+ ## multiple values per key
+ def krpc_store_values(self, key, values, id, _krpc_sender):
+ t = "%0.6f" % time.time()
+ c = self.store.cursor()
+ key = sqlite.encode(key)
+ for value in values:
+ value = sqlite.encode(value)
+ try:
+ c.execute("insert into kv values (%s, %s, %s);", key, value, t)
+ except sqlite.IntegrityError, reason:
+ # update last insert time
+ c.execute("update kv set time = %s where key = %s and value = %s;", (t, key, value))
+ sender = {'id' : id}
+ sender['host'] = _krpc_sender[0]
+ sender['port'] = _krpc_sender[1]
+ n = Node().initWithDict(sender)
+ n.conn = self.udp.connectionForAddr((n.host, n.port))
+ self.insertNode(n, contacted=0)
+ return {"id" : self.node.id}
+
def krpc_find_value(self, key, id, _krpc_sender):
sender = {'id' : id}
sender['host'] = _krpc_sender[0]
df.addErrback(self.errBack)
df.addCallback(self.checkSender)
return df
+ def storeValues(self, key, value, id):
+ df = self.conn.sendRequest('store_values', {"key" : key, "values" : value, "id": id})
+ df.addErrback(self.errBack)
+ df.addCallback(self.checkSender)
+ return df
def findValue(self, key, id):
df = self.conn.sendRequest('find_value', {"key" : key, "id" : id})
df.addErrback(self.errBack)
import khash as hash
-KRPC_TIMEOUT = 60
+KRPC_TIMEOUT = 20
KRPC_ERROR = 1
KRPC_ERROR_METHOD_UNKNOWN = 2
from unittest import *
from khashmir import *
import khash
+from copy import copy
from whrandom import randrange
class MultiTest(TestCase):
- num = 5
+ num = 20
def _done(self, val):
self.done = 1
K = khash.newID()
V = khash.newID()
- self.done = 0
- def _cb(val):
- self.done = 1
- self.l[randrange(0, self.num)].storeValueForKey(K, V, _cb)
- while not self.done:
- reactor.iterate()
+ for a in range(3):
+ self.done = 0
+ def _scb(val):
+ self.done = 1
+ self.l[randrange(0, self.num)].storeValueForKey(K, V, _scb)
+ while not self.done:
+ reactor.iterate()
+
+
+ def _rcb(val):
+ if not val:
+ self.done = 1
+ self.assertEqual(self.got, 1)
+ elif V in val:
+ self.got = 1
+ for x in range(3):
+ self.got = 0
+ self.done = 0
+ self.l[randrange(0, self.num)].valueForKey(K, _rcb)
+ while not self.done:
+ reactor.iterate()
- self.got = 0
- self.done = 0
- def _cb(val):
- if not val:
+ K = khash.newID()
+ l = map(lambda a: newID(), range(8))
+ for a in range(3):
+ self.done = 0
+ def _scb(val):
self.done = 1
- self.assertEqual(self.got, 1)
- elif V in val:
- self.got = 1
-
- self.l[randrange(0, self.num)].valueForKey(K, _cb)
- while not self.done:
- reactor.iterate()
+ self.l[randrange(0, self.num)].storeValueForKey(K, l, _scb)
+ while not self.done:
+ reactor.iterate()
+
+
+ c = []
+ def _rcb(val):
+ if not val:
+ self.done = 1
+ self.assertEqual(self.got, 1)
+ for n in val:
+ c.remove(n)
+ if not c:
+ self.got = 1
+ for x in range(3):
+ self.got = 0
+ self.done = 0
+ c = copy(l)
+ self.l[randrange(0, self.num)].valueForKey(K, _rcb)
+ while not self.done:
+ reactor.iterate()
+
+
+