b8f5c8ccd2c2ff75d3fe5d2ff83d7eb44a091dbe
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from twisted.internet import reactor
5 from twisted.python import log
6
7 from khash import intify
8 from util import uncompact
9
10 class ActionBase:
11     """ base class for some long running asynchronous proccesses like finding nodes or values """
12     def __init__(self, caller, target, callback, config):
13         self.caller = caller
14         self.target = target
15         self.config = config
16         self.num = intify(target)
17         self.found = {}
18         self.queried = {}
19         self.answered = {}
20         self.callback = callback
21         self.outstanding = 0
22         self.finished = 0
23     
24         def sort(a, b, num=self.num):
25             """ this function is for sorting nodes relative to the ID we are looking for """
26             x, y = num ^ a.num, num ^ b.num
27             if x > y:
28                 return 1
29             elif x < y:
30                 return -1
31             return 0
32         self.sort = sort
33         
34     def goWithNodes(self, t):
35         pass
36     
37     
38
39 FIND_NODE_TIMEOUT = 15
40
41 class FindNode(ActionBase):
42     """ find node action merits it's own class as it is a long running stateful process """
43     def handleGotNodes(self, dict):
44         _krpc_sender = dict['_krpc_sender']
45         dict = dict['rsp']
46         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
47         self.caller.insertNode(n)
48         l = dict["nodes"]
49         if self.finished or self.answered.has_key(dict["id"]):
50             # a day late and a dollar short
51             return
52         self.outstanding = self.outstanding - 1
53         self.answered[dict["id"]] = 1
54         for compact_node in l:
55             node = uncompact(compact_node)
56             n = self.caller.Node(node)
57             if not self.found.has_key(n.id):
58                 self.found[n.id] = n
59         self.schedule()
60         
61     def schedule(self):
62         """
63             send messages to new peers, if necessary
64         """
65         if self.finished:
66             return
67         l = self.found.values()
68         l.sort(self.sort)
69         for node in l[:self.config['K']]:
70             if node.id == self.target:
71                 self.finished=1
72                 return self.callback([node])
73             if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
74                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
75                 df = node.findNode(self.target, self.caller.node.id)
76                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
77                 self.outstanding = self.outstanding + 1
78                 self.queried[node.id] = 1
79             if self.outstanding >= self.config['CONCURRENT_REQS']:
80                 break
81         assert self.outstanding >=0
82         if self.outstanding == 0:
83             ## all done!!
84             self.finished=1
85             reactor.callLater(0, self.callback, l[:self.config['K']])
86     
87     def makeMsgFailed(self, node):
88         def defaultGotNodes(err, self=self, node=node):
89             log.msg("find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
90             log.err(err)
91             self.caller.table.nodeFailed(node)
92             self.outstanding = self.outstanding - 1
93             self.schedule()
94         return defaultGotNodes
95     
96     def goWithNodes(self, nodes):
97         """
98             this starts the process, our argument is a transaction with t.extras being our list of nodes
99             it's a transaction since we got called from the dispatcher
100         """
101         for node in nodes:
102             if node.id == self.caller.node.id:
103                 continue
104             else:
105                 self.found[node.id] = node
106         
107         self.schedule()
108     
109
110 get_value_timeout = 15
111 class GetValue(FindNode):
112     def __init__(self, caller, target, callback, config, find="findValue"):
113         FindNode.__init__(self, caller, target, callback, config)
114         self.findValue = find
115             
116     """ get value task """
117     def handleGotNodes(self, dict):
118         _krpc_sender = dict['_krpc_sender']
119         dict = dict['rsp']
120         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
121         self.caller.insertNode(n)
122         if self.finished or self.answered.has_key(dict["id"]):
123             # a day late and a dollar short
124             return
125         self.outstanding = self.outstanding - 1
126         self.answered[dict["id"]] = 1
127         # go through nodes
128         # if we have any closer than what we already got, query them
129         if dict.has_key('nodes'):
130             for compact_node in dict['nodes']:
131                 node = uncompact(compact_node)
132                 n = self.caller.Node(node)
133                 if not self.found.has_key(n.id):
134                     self.found[n.id] = n
135         elif dict.has_key('values'):
136             def x(y, z=self.results):
137                 if not z.has_key(y):
138                     z[y] = 1
139                     return y
140                 else:
141                     return None
142             z = len(dict['values'])
143             v = filter(None, map(x, dict['values']))
144             if(len(v)):
145                 reactor.callLater(0, self.callback, self.target, v)
146         self.schedule()
147         
148     ## get value
149     def schedule(self):
150         if self.finished:
151             return
152         l = self.found.values()
153         l.sort(self.sort)
154         
155         for node in l[:self.config['K']]:
156             if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
157                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
158                 try:
159                     f = getattr(node, self.findValue)
160                 except AttributeError:
161                     log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue))
162                 else:
163                     df = f(self.target, self.caller.node.id)
164                     df.addCallback(self.handleGotNodes)
165                     df.addErrback(self.makeMsgFailed(node))
166                     self.outstanding = self.outstanding + 1
167                     self.queried[node.id] = 1
168             if self.outstanding >= self.config['CONCURRENT_REQS']:
169                 break
170         assert self.outstanding >=0
171         if self.outstanding == 0:
172             ## all done, didn't find it!!
173             self.finished=1
174             reactor.callLater(0, self.callback, self.target, [])
175
176     ## get value
177     def goWithNodes(self, nodes, found=None):
178         self.results = {}
179         if found:
180             for n in found:
181                 self.results[n] = 1
182         for node in nodes:
183             if node.id == self.caller.node.id:
184                 continue
185             else:
186                 self.found[node.id] = node
187             
188         self.schedule()
189
190
191 class StoreValue(ActionBase):
192     def __init__(self, caller, target, value, callback, config, store="storeValue"):
193         ActionBase.__init__(self, caller, target, callback, config)
194         self.value = value
195         self.stored = []
196         self.store = store
197         
198     def storedValue(self, t, node):
199         self.outstanding -= 1
200         self.caller.insertNode(node)
201         if self.finished:
202             return
203         self.stored.append(t)
204         if len(self.stored) >= self.config['STORE_REDUNDANCY']:
205             self.finished=1
206             self.callback(self.target, self.value, self.stored)
207         else:
208             if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
209                 self.schedule()
210         return t
211     
212     def storeFailed(self, t, node):
213         log.msg("store failed %s/%s" % (node.host, node.port))
214         self.caller.nodeFailed(node)
215         self.outstanding -= 1
216         if self.finished:
217             return t
218         self.schedule()
219         return t
220     
221     def schedule(self):
222         if self.finished:
223             return
224         num = self.config['CONCURRENT_REQS'] - self.outstanding
225         if num > self.config['STORE_REDUNDANCY']:
226             num = self.config['STORE_REDUNDANCY']
227         for i in range(num):
228             try:
229                 node = self.nodes.pop()
230             except IndexError:
231                 if self.outstanding == 0:
232                     self.finished = 1
233                     self.callback(self.target, self.value, self.stored)
234             else:
235                 if not node.id == self.caller.node.id:
236                     self.outstanding += 1
237                     try:
238                         f = getattr(node, self.store)
239                     except AttributeError:
240                         log.msg("%s doesn't have a %s method!" % (node, self.store))
241                     else:
242                         df = f(self.target, self.value, self.caller.node.id)
243                         df.addCallback(self.storedValue, node=node)
244                         df.addErrback(self.storeFailed, node=node)
245                     
246     def goWithNodes(self, nodes):
247         self.nodes = nodes
248         self.nodes.sort(self.sort)
249         self.schedule()
250
251
252 class KeyExpirer:
253     def __init__(self, store, config):
254         self.store = store
255         self.config = config
256         self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
257     
258     def doExpire(self):
259         self.store.expireValues(self.config['KE_AGE'])
260         self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
261         
262     def shutdown(self):
263         try:
264             self.next_expire.cancel()
265         except:
266             pass