## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
# see LICENSE.txt for license information
-from time import time
-
from twisted.internet import reactor
-import const
from khash import intify
class ActionBase:
""" base class for some long running asynchronous proccesses like finding nodes or values """
- def __init__(self, table, target, callback):
+ def __init__(self, table, target, callback, config):
self.table = table
self.target = target
+ self.config = config
self.num = intify(target)
self.found = {}
self.queried = {}
return
l = self.found.values()
l.sort(self.sort)
- for node in l[:const.K]:
+ for node in l[:self.config['K']]:
if node.id == self.target:
self.finished=1
return self.callback([node])
df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
- if self.outstanding >= const.CONCURRENT_REQS:
+ if self.outstanding >= self.config['CONCURRENT_REQS']:
break
assert(self.outstanding) >=0
if self.outstanding == 0:
## all done!!
self.finished=1
- reactor.callLater(0, self.callback, l[:const.K])
+ reactor.callLater(0, self.callback, l[:self.config['K']])
def makeMsgFailed(self, node):
def defaultGotNodes(err, self=self, node=node):
get_value_timeout = 15
class GetValue(FindNode):
- def __init__(self, table, target, callback, find="findValue"):
- FindNode.__init__(self, table, target, callback)
+ def __init__(self, table, target, callback, config, find="findValue"):
+ FindNode.__init__(self, table, target, callback, config)
self.findValue = find
""" get value task """
z = len(dict['values'])
v = filter(None, map(x, dict['values']))
if(len(v)):
- reactor.callLater(0, self.callback, v)
+ reactor.callLater(0, self.callback, self.target, v)
self.schedule()
## get value
l = self.found.values()
l.sort(self.sort)
- for node in l[:const.K]:
+ for node in l[:self.config['K']]:
if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
#xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
try:
df.addErrback(self.makeMsgFailed(node))
self.outstanding = self.outstanding + 1
self.queried[node.id] = 1
- if self.outstanding >= const.CONCURRENT_REQS:
+ if self.outstanding >= self.config['CONCURRENT_REQS']:
break
assert(self.outstanding) >=0
if self.outstanding == 0:
## all done, didn't find it!!
self.finished=1
- reactor.callLater(0, self.callback,[])
+ reactor.callLater(0, self.callback, self.target, [])
## get value
def goWithNodes(self, nodes, found=None):
class StoreValue(ActionBase):
- def __init__(self, table, target, value, callback, store="storeValue"):
- ActionBase.__init__(self, table, target, callback)
+ def __init__(self, table, target, value, callback, config, store="storeValue"):
+ ActionBase.__init__(self, table, target, callback, config)
self.value = value
self.stored = []
self.store = store
if self.finished:
return
self.stored.append(t)
- if len(self.stored) >= const.STORE_REDUNDANCY:
+ if len(self.stored) >= self.config['STORE_REDUNDANCY']:
self.finished=1
- self.callback(self.stored)
+ self.callback(self.target, self.value, self.stored)
else:
- if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
+ if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
self.schedule()
return t
def schedule(self):
if self.finished:
return
- num = const.CONCURRENT_REQS - self.outstanding
- if num > const.STORE_REDUNDANCY:
- num = const.STORE_REDUNDANCY
+ num = self.config['CONCURRENT_REQS'] - self.outstanding
+ if num > self.config['STORE_REDUNDANCY']:
+ num = self.config['STORE_REDUNDANCY']
for i in range(num):
try:
node = self.nodes.pop()
except IndexError:
if self.outstanding == 0:
self.finished = 1
- self.callback(self.stored)
+ self.callback(self.target, self.value, self.stored)
else:
if not node.id == self.table.node.id:
self.outstanding += 1
class KeyExpirer:
- def __init__(self, store):
+ def __init__(self, store, config):
self.store = store
- reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
+ self.config = config
+ self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
def doExpire(self):
- self.cut = "%0.6f" % (time() - const.KE_AGE)
- self._expire()
-
- def _expire(self):
- c = self.store.cursor()
- s = "delete from kv where time < '%s';" % self.cut
- c.execute(s)
- reactor.callLater(const.KE_DELAY, self.doExpire)
+ self.store.expireValues(self.config['KE_AGE'])
+ self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
+
+ def shutdown(self):
+ try:
+ self.next_expire.cancel()
+ except:
+ pass