1 from const import reactor
3 from hash import intify
4 from knode import KNode as Node
5 from ktable import KTable, K
6 # concurrent FIND_NODE/VALUE requests!
10 """ base class for some long running asynchronous proccesses like finding nodes or values """
11 def __init__(self, table, target, callback):
14 self.int = intify(target)
18 self.callback = callback
22 def sort(a, b, int=self.int):
23 """ this function is for sorting nodes relative to the ID we are looking for """
24 x, y = int ^ a.int, int ^ b.int
32 def goWithNodes(self, t):
37 FIND_NODE_TIMEOUT = 15
39 class FindNode(ActionBase):
40 """ find node action merits it's own class as it is a long running stateful process """
41 def handleGotNodes(self, args):
43 if self.finished or self.answered.has_key(sender['id']):
44 # a day late and a dollar short
46 self.outstanding = self.outstanding - 1
47 self.answered[sender['id']] = 1
49 if not self.found.has_key(node['id']):
50 n = Node(node['id'], node['host'], node['port'])
52 self.table.insertNode(n)
57 send messages to new peers, if necessary
61 l = self.found.values()
65 if node.id == self.target:
67 return self.callback([node])
68 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
69 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
70 df = node.findNode(self.target, self.table.node.senderDict())
71 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
72 self.outstanding = self.outstanding + 1
73 self.queried[node.id] = 1
74 if self.outstanding >= N:
76 assert(self.outstanding) >=0
77 if self.outstanding == 0:
80 reactor.callFromThread(self.callback, l[:K])
82 def defaultGotNodes(self, t):
85 self.outstanding = self.outstanding - 1
89 def goWithNodes(self, nodes):
91 this starts the process, our argument is a transaction with t.extras being our list of nodes
92 it's a transaction since we got called from the dispatcher
95 if node.id == self.table.node.id:
97 self.found[node.id] = node
98 #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
99 df = node.findNode(self.target, self.table.node.senderDict())
100 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
101 self.outstanding = self.outstanding + 1
102 self.queried[node.id] = 1
103 if self.outstanding == 0:
107 GET_VALUE_TIMEOUT = 15
108 class GetValue(FindNode):
109 """ get value task """
110 def handleGotNodes(self, args):
112 if self.finished or self.answered.has_key(sender['id']):
113 # a day late and a dollar short
115 self.outstanding = self.outstanding - 1
116 self.answered[sender['id']] = 1
118 # if we have any closer than what we already got, query them
119 if l.has_key('nodes'):
120 for node in l['nodes']:
121 if not self.found.has_key(node['id']):
122 n = Node(node['id'], node['host'], node['port'])
124 self.table.insertNode(n)
125 elif l.has_key('values'):
128 reactor.callFromThread(self.callback, l['values'])
135 l = self.found.values()
139 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
140 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
141 df = node.findValue(self.target, self.table.node.senderDict())
142 df.addCallback(self.handleGotNodes)
143 df.addErrback(self.defaultGotNodes)
144 self.outstanding = self.outstanding + 1
145 self.queried[node.id] = 1
146 if self.outstanding >= N:
148 assert(self.outstanding) >=0
149 if self.outstanding == 0:
150 ## all done, didn't find it!!
152 reactor.callFromThread(self.callback,[])
155 def goWithNodes(self, nodes):
157 if node.id == self.table.node.id:
159 self.found[node.id] = node
160 #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
161 df = node.findValue(self.target, self.table.node.senderDict())
162 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
163 self.outstanding = self.outstanding + 1
164 self.queried[node.id] = 1
165 if self.outstanding == 0:
166 reactor.callFromThread(self.callback, [])