Use function for sending krpc responses, and add spew parameter.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from twisted.internet import reactor
5
6 from khash import intify
7
8 class ActionBase:
9     """ base class for some long running asynchronous proccesses like finding nodes or values """
10     def __init__(self, caller, target, callback, config):
11         self.caller = caller
12         self.target = target
13         self.config = config
14         self.num = intify(target)
15         self.found = {}
16         self.queried = {}
17         self.answered = {}
18         self.callback = callback
19         self.outstanding = 0
20         self.finished = 0
21     
22         def sort(a, b, num=self.num):
23             """ this function is for sorting nodes relative to the ID we are looking for """
24             x, y = num ^ a.num, num ^ b.num
25             if x > y:
26                 return 1
27             elif x < y:
28                 return -1
29             return 0
30         self.sort = sort
31         
32     def goWithNodes(self, t):
33         pass
34     
35     
36
37 FIND_NODE_TIMEOUT = 15
38
39 class FindNode(ActionBase):
40     """ find node action merits it's own class as it is a long running stateful process """
41     def handleGotNodes(self, dict):
42         _krpc_sender = dict['_krpc_sender']
43         dict = dict['rsp']
44         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
45         self.caller.insertNode(n)
46         l = dict["nodes"]
47         if self.finished or self.answered.has_key(dict["id"]):
48             # a day late and a dollar short
49             return
50         self.outstanding = self.outstanding - 1
51         self.answered[dict["id"]] = 1
52         for node in l:
53             n = self.caller.Node(node)
54             if not self.found.has_key(n.id):
55                 self.found[n.id] = n
56         self.schedule()
57         
58     def schedule(self):
59         """
60             send messages to new peers, if necessary
61         """
62         if self.finished:
63             return
64         l = self.found.values()
65         l.sort(self.sort)
66         for node in l[:self.config['K']]:
67             if node.id == self.target:
68                 self.finished=1
69                 return self.callback([node])
70             if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
71                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
72                 df = node.findNode(self.target, self.caller.node.id)
73                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
74                 self.outstanding = self.outstanding + 1
75                 self.queried[node.id] = 1
76             if self.outstanding >= self.config['CONCURRENT_REQS']:
77                 break
78         assert self.outstanding >=0
79         if self.outstanding == 0:
80             ## all done!!
81             self.finished=1
82             reactor.callLater(0, self.callback, l[:self.config['K']])
83     
84     def makeMsgFailed(self, node):
85         def defaultGotNodes(err, self=self, node=node):
86             print ">>> find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port), err
87             self.caller.table.nodeFailed(node)
88             self.outstanding = self.outstanding - 1
89             self.schedule()
90         return defaultGotNodes
91     
92     def goWithNodes(self, nodes):
93         """
94             this starts the process, our argument is a transaction with t.extras being our list of nodes
95             it's a transaction since we got called from the dispatcher
96         """
97         for node in nodes:
98             if node.id == self.caller.node.id:
99                 continue
100             else:
101                 self.found[node.id] = node
102         
103         self.schedule()
104     
105
106 get_value_timeout = 15
107 class GetValue(FindNode):
108     def __init__(self, caller, target, callback, config, find="findValue"):
109         FindNode.__init__(self, caller, target, callback, config)
110         self.findValue = find
111             
112     """ get value task """
113     def handleGotNodes(self, dict):
114         _krpc_sender = dict['_krpc_sender']
115         dict = dict['rsp']
116         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
117         self.caller.insertNode(n)
118         if self.finished or self.answered.has_key(dict["id"]):
119             # a day late and a dollar short
120             return
121         self.outstanding = self.outstanding - 1
122         self.answered[dict["id"]] = 1
123         # go through nodes
124         # if we have any closer than what we already got, query them
125         if dict.has_key('nodes'):
126             for node in dict['nodes']:
127                 n = self.caller.Node(node)
128                 if not self.found.has_key(n.id):
129                     self.found[n.id] = n
130         elif dict.has_key('values'):
131             def x(y, z=self.results):
132                 if not z.has_key(y):
133                     z[y] = 1
134                     return y
135                 else:
136                     return None
137             z = len(dict['values'])
138             v = filter(None, map(x, dict['values']))
139             if(len(v)):
140                 reactor.callLater(0, self.callback, self.target, v)
141         self.schedule()
142         
143     ## get value
144     def schedule(self):
145         if self.finished:
146             return
147         l = self.found.values()
148         l.sort(self.sort)
149         
150         for node in l[:self.config['K']]:
151             if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
152                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
153                 try:
154                     f = getattr(node, self.findValue)
155                 except AttributeError:
156                     print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
157                 else:
158                     df = f(self.target, self.caller.node.id)
159                     df.addCallback(self.handleGotNodes)
160                     df.addErrback(self.makeMsgFailed(node))
161                     self.outstanding = self.outstanding + 1
162                     self.queried[node.id] = 1
163             if self.outstanding >= self.config['CONCURRENT_REQS']:
164                 break
165         assert self.outstanding >=0
166         if self.outstanding == 0:
167             ## all done, didn't find it!!
168             self.finished=1
169             reactor.callLater(0, self.callback, self.target, [])
170
171     ## get value
172     def goWithNodes(self, nodes, found=None):
173         self.results = {}
174         if found:
175             for n in found:
176                 self.results[n] = 1
177         for node in nodes:
178             if node.id == self.caller.node.id:
179                 continue
180             else:
181                 self.found[node.id] = node
182             
183         self.schedule()
184
185
186 class StoreValue(ActionBase):
187     def __init__(self, caller, target, value, callback, config, store="storeValue"):
188         ActionBase.__init__(self, caller, target, callback, config)
189         self.value = value
190         self.stored = []
191         self.store = store
192         
193     def storedValue(self, t, node):
194         self.outstanding -= 1
195         self.caller.insertNode(node)
196         if self.finished:
197             return
198         self.stored.append(t)
199         if len(self.stored) >= self.config['STORE_REDUNDANCY']:
200             self.finished=1
201             self.callback(self.target, self.value, self.stored)
202         else:
203             if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
204                 self.schedule()
205         return t
206     
207     def storeFailed(self, t, node):
208         print ">>> store failed %s/%s" % (node.host, node.port)
209         self.caller.nodeFailed(node)
210         self.outstanding -= 1
211         if self.finished:
212             return t
213         self.schedule()
214         return t
215     
216     def schedule(self):
217         if self.finished:
218             return
219         num = self.config['CONCURRENT_REQS'] - self.outstanding
220         if num > self.config['STORE_REDUNDANCY']:
221             num = self.config['STORE_REDUNDANCY']
222         for i in range(num):
223             try:
224                 node = self.nodes.pop()
225             except IndexError:
226                 if self.outstanding == 0:
227                     self.finished = 1
228                     self.callback(self.target, self.value, self.stored)
229             else:
230                 if not node.id == self.caller.node.id:
231                     self.outstanding += 1
232                     try:
233                         f = getattr(node, self.store)
234                     except AttributeError:
235                         print ">>> %s doesn't have a %s method!" % (node, self.store)
236                     else:
237                         df = f(self.target, self.value, self.caller.node.id)
238                         df.addCallback(self.storedValue, node=node)
239                         df.addErrback(self.storeFailed, node=node)
240                     
241     def goWithNodes(self, nodes):
242         self.nodes = nodes
243         self.nodes.sort(self.sort)
244         self.schedule()
245
246
247 class KeyExpirer:
248     def __init__(self, store, config):
249         self.store = store
250         self.config = config
251         self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
252     
253     def doExpire(self):
254         self.store.expireValues(self.config['KE_AGE'])
255         self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
256         
257     def shutdown(self):
258         try:
259             self.next_expire.cancel()
260         except:
261             pass