From 349bb1f96bc7053e28739639eced3f5a784fc87e Mon Sep 17 00:00:00 2001 From: burris Date: Sat, 7 Sep 2002 19:13:28 +0000 Subject: [PATCH] now with auto-expiring of stored keys/values lots of typo bug fixes --- actions.py | 58 +++++++++++++++++++++++++++++++++++++++++++++++++++++ khashmir.py | 35 ++++++++++++++++---------------- 2 files changed, 76 insertions(+), 17 deletions(-) diff --git a/actions.py b/actions.py index a4d9514..c3cea32 100644 --- a/actions.py +++ b/actions.py @@ -1,8 +1,14 @@ +from time import time +from pickle import loads, dumps + +from bsddb3 import db + from const import reactor from hash import intify from knode import KNode as Node from ktable import KTable, K + # concurrent FIND_NODE/VALUE requests! N = 3 @@ -170,3 +176,55 @@ class GetValue(FindNode): if self.outstanding == 0: reactor.callFromThread(self.callback, []) + +KEINITIAL_DELAY = 60 # 1 minute +KE_DELAY = 60 # 1 minute +KE_AGE = 60 * 5 +class KeyExpirer: + def __init__(self, store, itime, kw): + self.store = store + self.itime = itime + self.kw = kw + reactor.callLater(KEINITIAL_DELAY, self.doExpire) + + def doExpire(self): + self.cut = `time() - KE_AGE` + self._expire() + + def _expire(self): + ic = self.itime.cursor() + sc = self.store.cursor() + kc = self.kw.cursor() + irec = None + try: + irec = ic.set_range(self.cut) + except db.DBNotFoundError: + # everything is expired + f = ic.prev + irec = f() + else: + f = ic.next + i = 0 + while irec: + it, h = irec + try: + k, v, lt = loads(self.store[h]) + except KeyError: + ic.delete() + else: + if lt < self.cut: + try: + kc.set_both(k, h) + except db.DBNotFoundError: + print "Database inconsistency! No key->value entry when a store entry was found!" + else: + kc.delete() + self.store.delete(h) + ic.delete() + i = i + 1 + irec = f() + + reactor.callLater(KE_DELAY, self.doExpire) + if(i > 0): + print ">>>KE: done expiring %d" % i + \ No newline at end of file diff --git a/khashmir.py b/khashmir.py index 2b716e6..9e5a26c 100644 --- a/khashmir.py +++ b/khashmir.py @@ -10,7 +10,7 @@ from knode import KNode as Node from hash import newID -from actions import FindNode, GetValue +from actions import FindNode, GetValue, KeyExpirer from twisted.web import xmlrpc from twisted.internet.defer import Deferred from twisted.python import threadable @@ -50,7 +50,8 @@ class Khashmir(xmlrpc.XMLRPC): self.kw.set_flags(db.DB_DUP) self.kw.open(None, None, db.DB_BTREE) - + KeyExpirer(store=self.store, itime=self.itime, kw=self.kw) + def render(self, request): """ Override the built in render so we can have access to the request object! @@ -94,14 +95,20 @@ class Khashmir(xmlrpc.XMLRPC): reactor.callFromThread(state.goWithNodes, nodes) + ## async, but in the current implementation there is no guarantee a store does anything so there is no callback right now - def storeValueForKey(self, key, value): + def storeValueForKey(self, key, value, callback=None): """ stores the value for key in the global table, returns immediately, no status in this implementation, peers respond but don't indicate status to storing values values are stored in peers on a first-come first-served basis this will probably change so more than one value can be stored under a key """ - def _storeValueForKey(nodes, key=key, value=value, response= self._storedValueHandler, default= lambda t: "didn't respond"): + def _storeValueForKey(nodes, key=key, value=value, response=callback , default= lambda t: "didn't respond"): + if not callback: + # default callback - this will get called for each successful store value + def _storedValueHandler(sender): + pass + response=_storedValueHandler for node in nodes: if node.id != self.node.id: df = node.storeValue(key, value, self.node.senderDict()) @@ -134,7 +141,7 @@ class Khashmir(xmlrpc.XMLRPC): self.table.insertNode(old) df = old.ping(self.node.senderDict()) - df.addCallbacks(_notStaleNodeHandler, self._staleNodeHandler) + df.addCallbacks(_notStaleNodeHandler, _staleNodeHandler) def sendPing(self, node): @@ -229,7 +236,7 @@ class Khashmir(xmlrpc.XMLRPC): self.insertNode(n) if self.kw.has_key(key): c = self.kw.cursor() - tup = c.set_range(key) + tup = c.set(key) l = [] while(tup): h1 = tup[1] @@ -242,20 +249,13 @@ class Khashmir(xmlrpc.XMLRPC): nodes = map(lambda node: node.senderDict(), nodes) return {'nodes' : nodes}, self.node.senderDict() - ### - ### message response callbacks - # called when we get a response to store value - def _storedValueHandler(self, sender): - pass - - #------ testing -def test_build_net(quiet=0, peers=64, pause=1): +def test_build_net(quiet=0, peers=8, pause=1): from whrandom import randrange import thread port = 2001 @@ -295,7 +295,7 @@ def test_build_net(quiet=0, peers=64, pause=1): if pause: time.sleep(.5) if pause: - time.sleep(10) + time.sleep(2) # for peer in l: # peer.refreshTable() return l @@ -322,6 +322,7 @@ def test_find_nodes(l, quiet=0): def test_find_value(l, quiet=0): from whrandom import randrange from sha import sha + from hash import newID import time, threading, sys fa = threading.Event() @@ -334,8 +335,8 @@ def test_find_value(l, quiet=0): c = l[randrange(0,n)] d = l[randrange(0,n)] - key = sha(`randrange(0,100000)`).digest() - value = sha(`randrange(0,100000)`).digest() + key = newID() + value = newID() if not quiet: print "inserting value..." sys.stdout.flush() -- 2.39.2