Return a token in find_node responses, use it in store_value requests.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from twisted.internet import reactor
5 from twisted.python import log
6
7 from khash import intify
8 from util import uncompact
9
10 class ActionBase:
11     """ base class for some long running asynchronous proccesses like finding nodes or values """
12     def __init__(self, caller, target, callback, config):
13         self.caller = caller
14         self.target = target
15         self.config = config
16         self.num = intify(target)
17         self.found = {}
18         self.queried = {}
19         self.answered = {}
20         self.callback = callback
21         self.outstanding = 0
22         self.finished = 0
23     
24         def sort(a, b, num=self.num):
25             """ this function is for sorting nodes relative to the ID we are looking for """
26             x, y = num ^ a.num, num ^ b.num
27             if x > y:
28                 return 1
29             elif x < y:
30                 return -1
31             return 0
32         self.sort = sort
33         
34     def goWithNodes(self, t):
35         pass
36     
37     
38
39 FIND_NODE_TIMEOUT = 15
40
41 class FindNode(ActionBase):
42     """ find node action merits it's own class as it is a long running stateful process """
43     def handleGotNodes(self, dict):
44         _krpc_sender = dict['_krpc_sender']
45         dict = dict['rsp']
46         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
47         self.caller.insertNode(n)
48         if dict["id"] in self.found:
49             self.found[dict["id"]].updateToken(dict.get('token', ''))
50         l = dict["nodes"]
51         if self.finished or self.answered.has_key(dict["id"]):
52             # a day late and a dollar short
53             return
54         self.outstanding = self.outstanding - 1
55         self.answered[dict["id"]] = 1
56         for compact_node in l:
57             node = uncompact(compact_node)
58             n = self.caller.Node(node)
59             if not self.found.has_key(n.id):
60                 self.found[n.id] = n
61         self.schedule()
62         
63     def schedule(self):
64         """
65             send messages to new peers, if necessary
66         """
67         if self.finished:
68             return
69         l = self.found.values()
70         l.sort(self.sort)
71         for node in l[:self.config['K']]:
72             if node.id == self.target:
73                 self.finished=1
74                 return self.callback([node])
75             if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
76                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
77                 df = node.findNode(self.target, self.caller.node.id)
78                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
79                 self.outstanding = self.outstanding + 1
80                 self.queried[node.id] = 1
81             if self.outstanding >= self.config['CONCURRENT_REQS']:
82                 break
83         assert self.outstanding >=0
84         if self.outstanding == 0:
85             ## all done!!
86             self.finished=1
87             reactor.callLater(0, self.callback, l[:self.config['K']])
88     
89     def makeMsgFailed(self, node):
90         def defaultGotNodes(err, self=self, node=node):
91             log.msg("find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
92             log.err(err)
93             self.caller.table.nodeFailed(node)
94             self.outstanding = self.outstanding - 1
95             self.schedule()
96         return defaultGotNodes
97     
98     def goWithNodes(self, nodes):
99         """
100             this starts the process, our argument is a transaction with t.extras being our list of nodes
101             it's a transaction since we got called from the dispatcher
102         """
103         for node in nodes:
104             if node.id == self.caller.node.id:
105                 continue
106             else:
107                 self.found[node.id] = node
108         
109         self.schedule()
110     
111
112 get_value_timeout = 15
113 class GetValue(FindNode):
114     def __init__(self, caller, target, callback, config, find="findValue"):
115         FindNode.__init__(self, caller, target, callback, config)
116         self.findValue = find
117             
118     """ get value task """
119     def handleGotNodes(self, dict):
120         _krpc_sender = dict['_krpc_sender']
121         dict = dict['rsp']
122         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
123         self.caller.insertNode(n)
124         if self.finished or self.answered.has_key(dict["id"]):
125             # a day late and a dollar short
126             return
127         self.outstanding = self.outstanding - 1
128         self.answered[dict["id"]] = 1
129         # go through nodes
130         # if we have any closer than what we already got, query them
131         if dict.has_key('nodes'):
132             for compact_node in dict['nodes']:
133                 node = uncompact(compact_node)
134                 n = self.caller.Node(node)
135                 if not self.found.has_key(n.id):
136                     self.found[n.id] = n
137         elif dict.has_key('values'):
138             def x(y, z=self.results):
139                 if not z.has_key(y):
140                     z[y] = 1
141                     return y
142                 else:
143                     return None
144             z = len(dict['values'])
145             v = filter(None, map(x, dict['values']))
146             if(len(v)):
147                 reactor.callLater(0, self.callback, self.target, v)
148         self.schedule()
149         
150     ## get value
151     def schedule(self):
152         if self.finished:
153             return
154         l = self.found.values()
155         l.sort(self.sort)
156         
157         for node in l[:self.config['K']]:
158             if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
159                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
160                 try:
161                     f = getattr(node, self.findValue)
162                 except AttributeError:
163                     log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue))
164                 else:
165                     df = f(self.target, self.caller.node.id)
166                     df.addCallback(self.handleGotNodes)
167                     df.addErrback(self.makeMsgFailed(node))
168                     self.outstanding = self.outstanding + 1
169                     self.queried[node.id] = 1
170             if self.outstanding >= self.config['CONCURRENT_REQS']:
171                 break
172         assert self.outstanding >=0
173         if self.outstanding == 0:
174             ## all done, didn't find it!!
175             self.finished=1
176             reactor.callLater(0, self.callback, self.target, [])
177
178     ## get value
179     def goWithNodes(self, nodes, found=None):
180         self.results = {}
181         if found:
182             for n in found:
183                 self.results[n] = 1
184         for node in nodes:
185             if node.id == self.caller.node.id:
186                 continue
187             else:
188                 self.found[node.id] = node
189             
190         self.schedule()
191
192
193 class StoreValue(ActionBase):
194     def __init__(self, caller, target, value, callback, config, store="storeValue"):
195         ActionBase.__init__(self, caller, target, callback, config)
196         self.value = value
197         self.stored = []
198         self.store = store
199         
200     def storedValue(self, t, node):
201         self.outstanding -= 1
202         self.caller.insertNode(node)
203         if self.finished:
204             return
205         self.stored.append(t)
206         if len(self.stored) >= self.config['STORE_REDUNDANCY']:
207             self.finished=1
208             self.callback(self.target, self.value, self.stored)
209         else:
210             if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
211                 self.schedule()
212         return t
213     
214     def storeFailed(self, t, node):
215         log.msg("store failed %s/%s" % (node.host, node.port))
216         self.caller.nodeFailed(node)
217         self.outstanding -= 1
218         if self.finished:
219             return t
220         self.schedule()
221         return t
222     
223     def schedule(self):
224         if self.finished:
225             return
226         num = self.config['CONCURRENT_REQS'] - self.outstanding
227         if num > self.config['STORE_REDUNDANCY']:
228             num = self.config['STORE_REDUNDANCY']
229         for i in range(num):
230             try:
231                 node = self.nodes.pop()
232             except IndexError:
233                 if self.outstanding == 0:
234                     self.finished = 1
235                     self.callback(self.target, self.value, self.stored)
236             else:
237                 if not node.id == self.caller.node.id:
238                     self.outstanding += 1
239                     try:
240                         f = getattr(node, self.store)
241                     except AttributeError:
242                         log.msg("%s doesn't have a %s method!" % (node, self.store))
243                     else:
244                         df = f(self.target, self.value, node.token, self.caller.node.id)
245                         df.addCallback(self.storedValue, node=node)
246                         df.addErrback(self.storeFailed, node=node)
247                     
248     def goWithNodes(self, nodes):
249         self.nodes = nodes
250         self.nodes.sort(self.sort)
251         self.schedule()
252
253
254 class KeyExpirer:
255     def __init__(self, store, config):
256         self.store = store
257         self.config = config
258         self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
259     
260     def doExpire(self):
261         self.store.expireValues(self.config['KE_AGE'])
262         self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
263         
264     def shutdown(self):
265         try:
266             self.next_expire.cancel()
267         except:
268             pass