X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=actions.py;h=013a9a7544dcd79189efb010a0afe3cab9c430d8;hb=d1507ff7d3cca77f48dffe460097f6ab3cd5d567;hp=18b9cddf82ae1dd016468eaf6b5ff5479af3067d;hpb=4310c7b8d0f5ba7795b3b4ed6f918329f9b710c0;p=quix0rs-apt-p2p.git diff --git a/actions.py b/actions.py index 18b9cdd..013a9a7 100644 --- a/actions.py +++ b/actions.py @@ -1,11 +1,12 @@ +## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved +# see LICENSE.txt for license information + from time import time -from const import reactor -import const +from twisted.internet import reactor -from hash import intify -from knode import KNode as Node -from ktable import KTable, K +import const +from khash import intify class ActionBase: """ base class for some long running asynchronous proccesses like finding nodes or values """ @@ -43,10 +44,11 @@ class FindNode(ActionBase): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] l = dict["nodes"] - sender = dict["sender"] + sender = {'id' : dict["id"]} sender['port'] = _krpc_sender[1] - sender = Node().initWithDict(sender) - sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port)) + sender['host'] = _krpc_sender[0] + sender = self.table.Node().initWithDict(sender) + sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short @@ -54,8 +56,8 @@ class FindNode(ActionBase): self.outstanding = self.outstanding - 1 self.answered[sender.id] = 1 for node in l: - n = Node().initWithDict(node) - n.conn = self.table.airhook.connectionForAddr((n.host, n.port)) + n = self.table.Node().initWithDict(node) + n.conn = self.table.udp.connectionForAddr((n.host, n.port)) if not self.found.has_key(n.id): self.found[n.id] = n self.schedule() @@ -68,13 +70,13 @@ class FindNode(ActionBase): return l = self.found.values() l.sort(self.sort) - for node in l[:K]: + for node in l[:const.K]: if node.id == self.target: self.finished=1 return self.callback([node]) if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT - df = node.findNode(self.target, self.table.node.senderDict()) + df = node.findNode(self.target, self.table.node.id) df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node)) self.outstanding = self.outstanding + 1 self.queried[node.id] = 1 @@ -84,11 +86,11 @@ class FindNode(ActionBase): if self.outstanding == 0: ## all done!! self.finished=1 - reactor.callFromThread(self.callback, l[:K]) + reactor.callLater(0, self.callback, l[:const.K]) def makeMsgFailed(self, node): def defaultGotNodes(err, self=self, node=node): - print ">>> find failed" + print ">>> find failed %s/%s" % (node.host, node.port), err self.table.table.nodeFailed(node) self.outstanding = self.outstanding - 1 self.schedule() @@ -108,16 +110,21 @@ class FindNode(ActionBase): self.schedule() -GET_VALUE_TIMEOUT = 15 +get_value_timeout = 15 class GetValue(FindNode): + def __init__(self, table, target, callback, find="findValue"): + FindNode.__init__(self, table, target, callback) + self.findValue = find + """ get value task """ def handleGotNodes(self, dict): _krpc_sender = dict['_krpc_sender'] dict = dict['rsp'] - sender = dict["sender"] - sender['port'] = _krpc_sender[1] - sender = Node().initWithDict(sender) - sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port)) + sender = {'id' : dict["id"]} + sender['port'] = _krpc_sender[1] + sender['host'] = _krpc_sender[0] + sender = self.table.Node().initWithDict(sender) + sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port)) self.table.table.insertNode(sender) if self.finished or self.answered.has_key(sender.id): # a day late and a dollar short @@ -128,8 +135,8 @@ class GetValue(FindNode): # if we have any closer than what we already got, query them if dict.has_key('nodes'): for node in dict['nodes']: - n = Node().initWithDict(node) - n.conn = self.table.airhook.connectionForAddr((n.host, n.port)) + n = self.table.Node().initWithDict(node) + n.conn = self.table.udp.connectionForAddr((n.host, n.port)) if not self.found.has_key(n.id): self.found[n.id] = n elif dict.has_key('values'): @@ -142,7 +149,7 @@ class GetValue(FindNode): z = len(dict['values']) v = filter(None, map(x, dict['values'])) if(len(v)): - reactor.callFromThread(self.callback, v) + reactor.callLater(0, self.callback, v) self.schedule() ## get value @@ -152,21 +159,26 @@ class GetValue(FindNode): l = self.found.values() l.sort(self.sort) - for node in l[:K]: + for node in l[:const.K]: if (not self.queried.has_key(node.id)) and node.id != self.table.node.id: #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT - df = node.findValue(self.target, self.table.node.senderDict()) - df.addCallback(self.handleGotNodes) - df.addErrback(self.makeMsgFailed(node)) - self.outstanding = self.outstanding + 1 - self.queried[node.id] = 1 + try: + f = getattr(node, self.findValue) + except AttributeError: + print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue) + else: + df = f(self.target, self.table.node.id) + df.addCallback(self.handleGotNodes) + df.addErrback(self.makeMsgFailed(node)) + self.outstanding = self.outstanding + 1 + self.queried[node.id] = 1 if self.outstanding >= const.CONCURRENT_REQS: break assert(self.outstanding) >=0 if self.outstanding == 0: ## all done, didn't find it!! self.finished=1 - reactor.callFromThread(self.callback,[]) + reactor.callLater(0, self.callback,[]) ## get value def goWithNodes(self, nodes, found=None): @@ -184,11 +196,12 @@ class GetValue(FindNode): class StoreValue(ActionBase): - def __init__(self, table, target, value, callback): + def __init__(self, table, target, value, callback, store="storeValue"): ActionBase.__init__(self, table, target, callback) self.value = value self.stored = [] - + self.store = store + def storedValue(self, t, node): self.outstanding -= 1 self.table.insertNode(node) @@ -201,15 +214,17 @@ class StoreValue(ActionBase): else: if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY: self.schedule() - + return t + def storeFailed(self, t, node): - print ">>> store failed" + print ">>> store failed %s/%s" % (node.host, node.port) self.table.nodeFailed(node) self.outstanding -= 1 if self.finished: - return + return t self.schedule() - + return t + def schedule(self): if self.finished: return @@ -226,9 +241,14 @@ class StoreValue(ActionBase): else: if not node.id == self.table.node.id: self.outstanding += 1 - df = node.storeValue(self.target, self.value, self.table.node.senderDict()) - df.addCallback(self.storedValue, node=node) - df.addErrback(self.storeFailed, node=node) + try: + f = getattr(node, self.store) + except AttributeError: + print ">>> %s doesn't have a %s method!" % (node, self.store) + else: + df = f(self.target, self.value, self.table.node.id) + df.addCallback(self.storedValue, node=node) + df.addErrback(self.storeFailed, node=node) def goWithNodes(self, nodes): self.nodes = nodes