From eab6f7ede51301780bcc43f0ee17c717538edd16 Mon Sep 17 00:00:00 2001 From: burris Date: Mon, 2 Sep 2002 08:14:41 +0000 Subject: [PATCH] moved actions into their own file --- actions.py | 167 +++++++++++++++++++++++++++++++++++++++++++++++++++ khashmir.py | 170 +--------------------------------------------------- 2 files changed, 169 insertions(+), 168 deletions(-) create mode 100644 actions.py diff --git a/actions.py b/actions.py new file mode 100644 index 0000000..052f6a4 --- /dev/null +++ b/actions.py @@ -0,0 +1,167 @@ +from const import reactor + +from hash import intify +from knode import KNode as Node +from ktable import KTable, K +# concurrent FIND_NODE/VALUE requests! +N = 3 + +class ActionBase: + """ base class for some long running asynchronous proccesses like finding nodes or values """ + def __init__(self, table, target, callback): + self.table = table + self.target = target + self.int = intify(target) + self.found = {} + self.queried = {} + self.answered = {} + self.callback = callback + self.outstanding = 0 + self.finished = 0 + + def sort(a, b, int=self.int): + """ this function is for sorting nodes relative to the ID we are looking for """ + x, y = int ^ a.int, int ^ b.int + if x > y: + return 1 + elif x < y: + return -1 + return 0 + self.sort = sort + + def goWithNodes(self, t): + pass + + + +FIND_NODE_TIMEOUT = 15 + +class FindNode(ActionBase): + """ find node action merits it's own class as it is a long running stateful process """ + def handleGotNodes(self, args): + l, sender = args + if self.finished or self.answered.has_key(sender['id']): + # a day late and a dollar short + return + self.outstanding = self.outstanding - 1 + self.answered[sender['id']] = 1 + for node in l: + if not self.found.has_key(node['id']): + n = Node(node['id'], node['host'], node['port']) + self.found[n.id] = n + self.table.insertNode(n) + self.schedule() + + def schedule(self): + """ + send messages to new peers, if necessary + """ + if self.finished: + return + l = self.found.values() + l.sort(self.sort) + + for node in l[:K]: + if node.id == self.target: + self.finished=1 + return self.callback([node]) + if not self.queried.has_key(node.id) and node.id != self.table.node.id: + #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT + df = node.findNode(self.target, self.table.node.senderDict()) + df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 + if self.outstanding >= N: + break + assert(self.outstanding) >=0 + if self.outstanding == 0: + ## all done!! + self.finished=1 + reactor.callFromThread(self.callback, l[:K]) + + def defaultGotNodes(self, t): + if self.finished: + return + self.outstanding = self.outstanding - 1 + self.schedule() + + + def goWithNodes(self, nodes): + """ + this starts the process, our argument is a transaction with t.extras being our list of nodes + it's a transaction since we got called from the dispatcher + """ + for node in nodes: + if node.id == self.table.node.id: + continue + self.found[node.id] = node + #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT + df = node.findNode(self.target, self.table.node.senderDict()) + df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 + if self.outstanding == 0: + self.callback(nodes) + + +GET_VALUE_TIMEOUT = 15 +class GetValue(FindNode): + """ get value task """ + def handleGotNodes(self, args): + l, sender = args + l = l[0] + if self.finished or self.answered.has_key(sender['id']): + # a day late and a dollar short + return + self.outstanding = self.outstanding - 1 + self.answered[sender['id']] = 1 + # go through nodes + # if we have any closer than what we already got, query them + if l.has_key('nodes'): + for node in l['nodes']: + if not self.found.has_key(node['id']): + n = Node(node['id'], node['host'], node['port']) + self.found[n.id] = n + self.table.insertNode(n) + elif l.has_key('values'): + ## done + self.finished = 1 + return self.callback(l['values']) + self.schedule() + + ## get value + def schedule(self): + if self.finished: + return + l = self.found.values() + l.sort(self.sort) + + for node in l[:K]: + if not self.queried.has_key(node.id) and node.id != self.table.node.id: + #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT + df = node.getValue(node, self.target) + df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 + if self.outstanding >= N: + break + assert(self.outstanding) >=0 + if self.outstanding == 0: + ## all done, didn't find it!! + self.finished=1 + reactor.callFromThread(self.callback,[]) + + ## get value + def goWithNodes(self, nodes): + for node in nodes: + if node.id == self.table.node.id: + continue + self.found[node.id] = node + #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT + df = node.findNode(self.target, self.table.node.senderDict()) + df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 + if self.outstanding == 0: + reactor.callFromThread(self.callback, []) + diff --git a/khashmir.py b/khashmir.py index 32dab3e..748ce21 100644 --- a/khashmir.py +++ b/khashmir.py @@ -6,8 +6,9 @@ import time from ktable import KTable, K from knode import KNode as Node -from hash import newID, intify +from hash import newID +from actions import FindNode, GetValue from twisted.web import xmlrpc from twisted.internet.defer import Deferred from twisted.python import threadable @@ -19,9 +20,6 @@ from bsddb3._db import DBNotFoundError # don't ping unless it's been at least this many seconds since we've heard from a peer MAX_PING_INTERVAL = 60 * 15 # fifteen minutes -# concurrent FIND_NODE/VALUE requests! -N = 3 - # this is the main class! @@ -216,170 +214,6 @@ class Khashmir(xmlrpc.XMLRPC): def _storedValueHandler(self, sender): pass - - - - -class ActionBase: - """ base class for some long running asynchronous proccesses like finding nodes or values """ - def __init__(self, table, target, callback): - self.table = table - self.target = target - self.int = intify(target) - self.found = {} - self.queried = {} - self.answered = {} - self.callback = callback - self.outstanding = 0 - self.finished = 0 - - def sort(a, b, int=self.int): - """ this function is for sorting nodes relative to the ID we are looking for """ - x, y = int ^ a.int, int ^ b.int - if x > y: - return 1 - elif x < y: - return -1 - return 0 - self.sort = sort - - def goWithNodes(self, t): - pass - - - -FIND_NODE_TIMEOUT = 15 - -class FindNode(ActionBase): - """ find node action merits it's own class as it is a long running stateful process """ - def handleGotNodes(self, args): - l, sender = args - if self.finished or self.answered.has_key(sender['id']): - # a day late and a dollar short - return - self.outstanding = self.outstanding - 1 - self.answered[sender['id']] = 1 - for node in l: - if not self.found.has_key(node['id']): - n = Node(node['id'], node['host'], node['port']) - self.found[n.id] = n - self.table.insertNode(n) - self.schedule() - - def schedule(self): - """ - send messages to new peers, if necessary - """ - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - - for node in l[:K]: - if node.id == self.target: - self.finished=1 - return self.callback([node]) - if not self.queried.has_key(node.id) and node.id != self.table.node.id: - #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding >= N: - break - assert(self.outstanding) >=0 - if self.outstanding == 0: - ## all done!! - self.finished=1 - reactor.callFromThread(self.callback, l[:K]) - - def defaultGotNodes(self, t): - if self.finished: - return - self.outstanding = self.outstanding - 1 - self.schedule() - - - def goWithNodes(self, nodes): - """ - this starts the process, our argument is a transaction with t.extras being our list of nodes - it's a transaction since we got called from the dispatcher - """ - for node in nodes: - if node.id == self.table.node.id: - continue - self.found[node.id] = node - #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding == 0: - self.callback(nodes) - - -GET_VALUE_TIMEOUT = 15 -class GetValue(FindNode): - """ get value task """ - def handleGotNodes(self, args): - l, sender = args - l = l[0] - if self.finished or self.answered.has_key(sender['id']): - # a day late and a dollar short - return - self.outstanding = self.outstanding - 1 - self.answered[sender['id']] = 1 - # go through nodes - # if we have any closer than what we already got, query them - if l.has_key('nodes'): - for node in l['nodes']: - if not self.found.has_key(node['id']): - n = Node(node['id'], node['host'], node['port']) - self.found[n.id] = n - self.table.insertNode(n) - elif l.has_key('values'): - ## done - self.finished = 1 - return self.callback(l['values']) - self.schedule() - - ## get value - def schedule(self): - if self.finished: - return - l = self.found.values() - l.sort(self.sort) - - for node in l[:K]: - if not self.queried.has_key(node.id) and node.id != self.table.node.id: - #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT - df = node.getValue(node, self.target) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding >= N: - break - assert(self.outstanding) >=0 - if self.outstanding == 0: - ## all done, didn't find it!! - self.finished=1 - reactor.callFromThread(self.callback,[]) - - ## get value - def goWithNodes(self, nodes): - for node in nodes: - if node.id == self.table.node.id: - continue - self.found[node.id] = node - #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.senderDict()) - df.addCallbacks(self.handleGotNodes, self.defaultGotNodes) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 - if self.outstanding == 0: - reactor.callFromThread(self.callback, []) - - #------ -- 2.39.5