Break up the find_value into 2 parts (with get_value).
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from twisted.internet import reactor
5 from twisted.python import log
6
7 from khash import intify
8 from util import uncompact
9
10 class ActionBase:
11     """ base class for some long running asynchronous proccesses like finding nodes or values """
12     def __init__(self, caller, target, callback, config):
13         self.caller = caller
14         self.target = target
15         self.config = config
16         self.num = intify(target)
17         self.found = {}
18         self.queried = {}
19         self.answered = {}
20         self.callback = callback
21         self.outstanding = 0
22         self.finished = 0
23     
24         def sort(a, b, num=self.num):
25             """ this function is for sorting nodes relative to the ID we are looking for """
26             x, y = num ^ a.num, num ^ b.num
27             if x > y:
28                 return 1
29             elif x < y:
30                 return -1
31             return 0
32         self.sort = sort
33         
34     def actionFailed(self, err, node):
35         log.msg("action %s failed (%s) %s/%s" % (self.__class__.__name__, self.config['PORT'], node.host, node.port))
36         log.err(err)
37         self.caller.table.nodeFailed(node)
38         self.outstanding = self.outstanding - 1
39         self.schedule()
40     
41     def goWithNodes(self, t):
42         pass
43     
44
45 class FindNode(ActionBase):
46     """ find node action merits it's own class as it is a long running stateful process """
47     def handleGotNodes(self, dict):
48         _krpc_sender = dict['_krpc_sender']
49         dict = dict['rsp']
50         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
51         self.caller.insertNode(n)
52         if dict["id"] in self.found:
53             self.found[dict["id"]].updateToken(dict.get('token', ''))
54         l = dict["nodes"]
55         if self.finished or self.answered.has_key(dict["id"]):
56             # a day late and a dollar short
57             return
58         self.outstanding = self.outstanding - 1
59         self.answered[dict["id"]] = 1
60         for compact_node in l:
61             node = uncompact(compact_node)
62             n = self.caller.Node(node)
63             if not self.found.has_key(n.id):
64                 self.found[n.id] = n
65         self.schedule()
66         
67     def schedule(self):
68         """
69             send messages to new peers, if necessary
70         """
71         if self.finished:
72             return
73         l = self.found.values()
74         l.sort(self.sort)
75         for node in l[:self.config['K']]:
76             if node.id == self.target:
77                 self.finished=1
78                 return self.callback([node])
79             if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
80                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
81                 df = node.findNode(self.target, self.caller.node.id)
82                 df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
83                 self.outstanding = self.outstanding + 1
84                 self.queried[node.id] = 1
85             if self.outstanding >= self.config['CONCURRENT_REQS']:
86                 break
87         assert self.outstanding >=0
88         if self.outstanding == 0:
89             ## all done!!
90             self.finished=1
91             reactor.callLater(0, self.callback, l[:self.config['K']])
92     
93     def goWithNodes(self, nodes):
94         """
95             this starts the process, our argument is a transaction with t.extras being our list of nodes
96             it's a transaction since we got called from the dispatcher
97         """
98         for node in nodes:
99             if node.id == self.caller.node.id:
100                 continue
101             else:
102                 self.found[node.id] = node
103         
104         self.schedule()
105     
106
107 class FindValue(ActionBase):
108     def handleGotNodes(self, dict):
109         _krpc_sender = dict['_krpc_sender']
110         dict = dict['rsp']
111         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
112         self.caller.insertNode(n)
113         if dict["id"] in self.found:
114             self.found[dict["id"]].updateNumValues(dict.get('num', 0))
115         l = dict["nodes"]
116         if self.finished or self.answered.has_key(dict["id"]):
117             # a day late and a dollar short
118             return
119         self.outstanding = self.outstanding - 1
120         self.answered[dict["id"]] = 1
121         for compact_node in l:
122             node = uncompact(compact_node)
123             n = self.caller.Node(node)
124             if not self.found.has_key(n.id):
125                 self.found[n.id] = n
126         self.schedule()
127         
128     def schedule(self):
129         """
130             send messages to new peers, if necessary
131         """
132         if self.finished:
133             return
134         l = self.found.values()
135         l.sort(self.sort)
136         for node in l[:self.config['K']]:
137             if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
138                 df = node.findValue(self.target, self.caller.node.id)
139                 df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
140                 self.outstanding = self.outstanding + 1
141                 self.queried[node.id] = 1
142             if self.outstanding >= self.config['CONCURRENT_REQS']:
143                 break
144         assert self.outstanding >=0
145         if self.outstanding == 0:
146             ## all done!!
147             self.finished=1
148             l = [node for node in self.found.values() if node.num_values > 0]
149             reactor.callLater(0, self.callback, l)
150     
151     def goWithNodes(self, nodes):
152         """
153             this starts the process, our argument is a transaction with t.extras being our list of nodes
154             it's a transaction since we got called from the dispatcher
155         """
156         for node in nodes:
157             if node.id == self.caller.node.id:
158                 continue
159             else:
160                 self.found[node.id] = node
161         
162         self.schedule()
163
164
165 class GetValue(ActionBase):
166     def __init__(self, caller, target, num, callback, config, action="getValue"):
167         ActionBase.__init__(self, caller, target, callback, config)
168         self.num_values = num
169         self.outstanding_gets = 0
170         self.action = action
171         
172     def gotValues(self, dict, node):
173         dict = dict['rsp']
174         self.outstanding -= 1
175         self.caller.insertNode(node)
176         if self.finished:
177             return
178         if dict.has_key('values'):
179             def x(y, z=self.results):
180                 if not z.has_key(y):
181                     z[y] = 1
182                     return y
183                 else:
184                     return None
185             z = len(dict['values'])
186             v = filter(None, map(x, dict['values']))
187             if len(v):
188                 reactor.callLater(0, self.callback, self.target, v)
189         if len(self.results) >= self.num_values:
190             self.finished=1
191             reactor.callLater(0, self.callback, self.target, [])
192         else:
193             if not len(self.results) + self.outstanding_gets >= self.num_values:
194                 self.schedule()
195     
196     def schedule(self):
197         if self.finished:
198             return
199         
200         for node in self.nodes:
201             if node.id not in self.queried and node.id != self.caller.node.id and node.num_values > 0:
202                 try:
203                     f = getattr(node, self.action)
204                 except AttributeError:
205                     log.msg("%s doesn't have a %s method!" % (node, self.action))
206                 else:
207                     self.outstanding += 1
208                     self.outstanding_gets += node.num_values
209                     df = f(self.target, 0, self.caller.node.id)
210                     df.addCallbacks(self.gotValues, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
211                     self.queried[node.id] = 1
212             if len(self.results) + self.outstanding_gets >= self.num_values or \
213                 self.outstanding >= self.config['CONCURRENT_REQS']:
214                 break
215         assert self.outstanding >=0
216         if self.outstanding == 0:
217             self.finished = 1
218             reactor.callLater(0, self.callback, self.target, [])
219                     
220     def goWithNodes(self, nodes, found = None):
221         self.results = {}
222         if found:
223             for n in found:
224                 self.results[n] = 1
225         self.nodes = nodes
226         self.nodes.sort(self.sort)
227         self.schedule()
228
229
230 class StoreValue(ActionBase):
231     def __init__(self, caller, target, value, callback, config, action="storeValue"):
232         ActionBase.__init__(self, caller, target, callback, config)
233         self.value = value
234         self.stored = []
235         self.action = action
236         
237     def storedValue(self, t, node):
238         self.outstanding -= 1
239         self.caller.insertNode(node)
240         if self.finished:
241             return
242         self.stored.append(t)
243         if len(self.stored) >= self.config['STORE_REDUNDANCY']:
244             self.finished=1
245             self.callback(self.target, self.value, self.stored)
246         else:
247             if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
248                 self.schedule()
249         return t
250     
251     def schedule(self):
252         if self.finished:
253             return
254         num = self.config['CONCURRENT_REQS'] - self.outstanding
255         if num > self.config['STORE_REDUNDANCY']:
256             num = self.config['STORE_REDUNDANCY']
257         for i in range(num):
258             try:
259                 node = self.nodes.pop()
260             except IndexError:
261                 if self.outstanding == 0:
262                     self.finished = 1
263                     self.callback(self.target, self.value, self.stored)
264             else:
265                 if not node.id == self.caller.node.id:
266                     self.outstanding += 1
267                     try:
268                         f = getattr(node, self.action)
269                     except AttributeError:
270                         log.msg("%s doesn't have a %s method!" % (node, self.action))
271                     else:
272                         df = f(self.target, self.value, node.token, self.caller.node.id)
273                         df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
274                     
275     def goWithNodes(self, nodes):
276         self.nodes = nodes
277         self.nodes.sort(self.sort)
278         self.schedule()