7822579733b55a9ec1e0234767dcf1d422daf74b
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from twisted.internet import reactor
5 from twisted.python import log
6
7 from khash import intify
8 from util import uncompact
9
10 class ActionBase:
11     """ base class for some long running asynchronous proccesses like finding nodes or values """
12     def __init__(self, caller, target, callback, config):
13         self.caller = caller
14         self.target = target
15         self.config = config
16         self.num = intify(target)
17         self.found = {}
18         self.queried = {}
19         self.answered = {}
20         self.callback = callback
21         self.outstanding = 0
22         self.finished = 0
23     
24         def sort(a, b, num=self.num):
25             """ this function is for sorting nodes relative to the ID we are looking for """
26             x, y = num ^ a.num, num ^ b.num
27             if x > y:
28                 return 1
29             elif x < y:
30                 return -1
31             return 0
32         self.sort = sort
33         
34     def actionFailed(self, err, node):
35         log.msg("action %s failed (%s) %s/%s" % (self.__class__.__name__, self.config['PORT'], node.host, node.port))
36         log.err(err)
37         self.caller.table.nodeFailed(node)
38         self.outstanding = self.outstanding - 1
39         self.schedule()
40     
41     def goWithNodes(self, t):
42         pass
43     
44     
45
46 FIND_NODE_TIMEOUT = 15
47
48 class FindNode(ActionBase):
49     """ find node action merits it's own class as it is a long running stateful process """
50     def handleGotNodes(self, dict):
51         _krpc_sender = dict['_krpc_sender']
52         dict = dict['rsp']
53         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
54         self.caller.insertNode(n)
55         if dict["id"] in self.found:
56             self.found[dict["id"]].updateToken(dict.get('token', ''))
57         l = dict["nodes"]
58         if self.finished or self.answered.has_key(dict["id"]):
59             # a day late and a dollar short
60             return
61         self.outstanding = self.outstanding - 1
62         self.answered[dict["id"]] = 1
63         for compact_node in l:
64             node = uncompact(compact_node)
65             n = self.caller.Node(node)
66             if not self.found.has_key(n.id):
67                 self.found[n.id] = n
68         self.schedule()
69         
70     def schedule(self):
71         """
72             send messages to new peers, if necessary
73         """
74         if self.finished:
75             return
76         l = self.found.values()
77         l.sort(self.sort)
78         for node in l[:self.config['K']]:
79             if node.id == self.target:
80                 self.finished=1
81                 return self.callback([node])
82             if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
83                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
84                 df = node.findNode(self.target, self.caller.node.id)
85                 df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
86                 self.outstanding = self.outstanding + 1
87                 self.queried[node.id] = 1
88             if self.outstanding >= self.config['CONCURRENT_REQS']:
89                 break
90         assert self.outstanding >=0
91         if self.outstanding == 0:
92             ## all done!!
93             self.finished=1
94             reactor.callLater(0, self.callback, l[:self.config['K']])
95     
96     def goWithNodes(self, nodes):
97         """
98             this starts the process, our argument is a transaction with t.extras being our list of nodes
99             it's a transaction since we got called from the dispatcher
100         """
101         for node in nodes:
102             if node.id == self.caller.node.id:
103                 continue
104             else:
105                 self.found[node.id] = node
106         
107         self.schedule()
108     
109
110 get_value_timeout = 15
111 class GetValue(FindNode):
112     def __init__(self, caller, target, callback, config, find="findValue"):
113         FindNode.__init__(self, caller, target, callback, config)
114         self.findValue = find
115             
116     """ get value task """
117     def handleGotNodes(self, dict):
118         _krpc_sender = dict['_krpc_sender']
119         dict = dict['rsp']
120         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
121         self.caller.insertNode(n)
122         if self.finished or self.answered.has_key(dict["id"]):
123             # a day late and a dollar short
124             return
125         self.outstanding = self.outstanding - 1
126         self.answered[dict["id"]] = 1
127         # go through nodes
128         # if we have any closer than what we already got, query them
129         if dict.has_key('nodes'):
130             for compact_node in dict['nodes']:
131                 node = uncompact(compact_node)
132                 n = self.caller.Node(node)
133                 if not self.found.has_key(n.id):
134                     self.found[n.id] = n
135         elif dict.has_key('values'):
136             def x(y, z=self.results):
137                 if not z.has_key(y):
138                     z[y] = 1
139                     return y
140                 else:
141                     return None
142             z = len(dict['values'])
143             v = filter(None, map(x, dict['values']))
144             if(len(v)):
145                 reactor.callLater(0, self.callback, self.target, v)
146         self.schedule()
147         
148     ## get value
149     def schedule(self):
150         if self.finished:
151             return
152         l = self.found.values()
153         l.sort(self.sort)
154         
155         for node in l[:self.config['K']]:
156             if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
157                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
158                 try:
159                     f = getattr(node, self.findValue)
160                 except AttributeError:
161                     log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue))
162                 else:
163                     df = f(self.target, self.caller.node.id)
164                     df.addCallbacks(self.handleGotNodes, self.actionFailed, errbackArgs = (node, ))
165                     self.outstanding = self.outstanding + 1
166                     self.queried[node.id] = 1
167             if self.outstanding >= self.config['CONCURRENT_REQS']:
168                 break
169         assert self.outstanding >=0
170         if self.outstanding == 0:
171             ## all done, didn't find it!!
172             self.finished=1
173             reactor.callLater(0, self.callback, self.target, [])
174
175     ## get value
176     def goWithNodes(self, nodes, found=None):
177         self.results = {}
178         if found:
179             for n in found:
180                 self.results[n] = 1
181         for node in nodes:
182             if node.id == self.caller.node.id:
183                 continue
184             else:
185                 self.found[node.id] = node
186             
187         self.schedule()
188
189
190 class StoreValue(ActionBase):
191     def __init__(self, caller, target, value, callback, config, store="storeValue"):
192         ActionBase.__init__(self, caller, target, callback, config)
193         self.value = value
194         self.stored = []
195         self.store = store
196         
197     def storedValue(self, t, node):
198         self.outstanding -= 1
199         self.caller.insertNode(node)
200         if self.finished:
201             return
202         self.stored.append(t)
203         if len(self.stored) >= self.config['STORE_REDUNDANCY']:
204             self.finished=1
205             self.callback(self.target, self.value, self.stored)
206         else:
207             if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
208                 self.schedule()
209         return t
210     
211     def schedule(self):
212         if self.finished:
213             return
214         num = self.config['CONCURRENT_REQS'] - self.outstanding
215         if num > self.config['STORE_REDUNDANCY']:
216             num = self.config['STORE_REDUNDANCY']
217         for i in range(num):
218             try:
219                 node = self.nodes.pop()
220             except IndexError:
221                 if self.outstanding == 0:
222                     self.finished = 1
223                     self.callback(self.target, self.value, self.stored)
224             else:
225                 if not node.id == self.caller.node.id:
226                     self.outstanding += 1
227                     try:
228                         f = getattr(node, self.store)
229                     except AttributeError:
230                         log.msg("%s doesn't have a %s method!" % (node, self.store))
231                     else:
232                         df = f(self.target, self.value, node.token, self.caller.node.id)
233                         df.addCallbacks(self.storedValue, self.actionFailed, callbackArgs = (node, ), errbackArgs = (node, ))
234                     
235     def goWithNodes(self, nodes):
236         self.nodes = nodes
237         self.nodes.sort(self.sort)
238         self.schedule()
239
240
241 class KeyExpirer:
242     def __init__(self, store, config):
243         self.store = store
244         self.config = config
245         self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
246     
247     def doExpire(self):
248         self.store.expireValues(self.config['KE_AGE'])
249         self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
250         
251     def shutdown(self):
252         try:
253             self.next_expire.cancel()
254         except:
255             pass