]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - apt_dht_Khashmir/actions.py
860205cf87851e0ccb43fb8a29a45ce70d9cc0ec
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from twisted.internet import reactor
5
6 from khash import intify
7
8 class ActionBase:
9     """ base class for some long running asynchronous proccesses like finding nodes or values """
10     def __init__(self, table, target, callback, config):
11         self.table = table
12         self.target = target
13         self.config = config
14         self.num = intify(target)
15         self.found = {}
16         self.queried = {}
17         self.answered = {}
18         self.callback = callback
19         self.outstanding = 0
20         self.finished = 0
21     
22         def sort(a, b, num=self.num):
23             """ this function is for sorting nodes relative to the ID we are looking for """
24             x, y = num ^ a.num, num ^ b.num
25             if x > y:
26                 return 1
27             elif x < y:
28                 return -1
29             return 0
30         self.sort = sort
31         
32     def goWithNodes(self, t):
33         pass
34     
35     
36
37 FIND_NODE_TIMEOUT = 15
38
39 class FindNode(ActionBase):
40     """ find node action merits it's own class as it is a long running stateful process """
41     def handleGotNodes(self, dict):
42         _krpc_sender = dict['_krpc_sender']
43         dict = dict['rsp']
44         l = dict["nodes"]
45         sender = {'id' : dict["id"]}
46         sender['port'] = _krpc_sender[1]        
47         sender['host'] = _krpc_sender[0]        
48         sender = self.table.Node().initWithDict(sender)
49         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
50         self.table.table.insertNode(sender)
51         if self.finished or self.answered.has_key(sender.id):
52             # a day late and a dollar short
53             return
54         self.outstanding = self.outstanding - 1
55         self.answered[sender.id] = 1
56         for node in l:
57             n = self.table.Node().initWithDict(node)
58             n.conn = self.table.udp.connectionForAddr((n.host, n.port))
59             if not self.found.has_key(n.id):
60                 self.found[n.id] = n
61         self.schedule()
62         
63     def schedule(self):
64         """
65             send messages to new peers, if necessary
66         """
67         if self.finished:
68             return
69         l = self.found.values()
70         l.sort(self.sort)
71         for node in l[:self.config['K']]:
72             if node.id == self.target:
73                 self.finished=1
74                 return self.callback([node])
75             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
76                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
77                 df = node.findNode(self.target, self.table.node.id)
78                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
79                 self.outstanding = self.outstanding + 1
80                 self.queried[node.id] = 1
81             if self.outstanding >= self.config['CONCURRENT_REQS']:
82                 break
83         assert(self.outstanding) >=0
84         if self.outstanding == 0:
85             ## all done!!
86             self.finished=1
87             reactor.callLater(0, self.callback, l[:self.config['K']])
88     
89     def makeMsgFailed(self, node):
90         def defaultGotNodes(err, self=self, node=node):
91             print ">>> find failed %s/%s" % (node.host, node.port), err
92             self.table.table.nodeFailed(node)
93             self.outstanding = self.outstanding - 1
94             self.schedule()
95         return defaultGotNodes
96     
97     def goWithNodes(self, nodes):
98         """
99             this starts the process, our argument is a transaction with t.extras being our list of nodes
100             it's a transaction since we got called from the dispatcher
101         """
102         for node in nodes:
103             if node.id == self.table.node.id:
104                 continue
105             else:
106                 self.found[node.id] = node
107         
108         self.schedule()
109     
110
111 get_value_timeout = 15
112 class GetValue(FindNode):
113     def __init__(self, table, target, callback, config, find="findValue"):
114         FindNode.__init__(self, table, target, callback, config)
115         self.findValue = find
116             
117     """ get value task """
118     def handleGotNodes(self, dict):
119         _krpc_sender = dict['_krpc_sender']
120         dict = dict['rsp']
121         sender = {'id' : dict["id"]}
122         sender['port'] = _krpc_sender[1]
123         sender['host'] = _krpc_sender[0]                
124         sender = self.table.Node().initWithDict(sender)
125         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
126         self.table.table.insertNode(sender)
127         if self.finished or self.answered.has_key(sender.id):
128             # a day late and a dollar short
129             return
130         self.outstanding = self.outstanding - 1
131         self.answered[sender.id] = 1
132         # go through nodes
133         # if we have any closer than what we already got, query them
134         if dict.has_key('nodes'):
135             for node in dict['nodes']:
136                 n = self.table.Node().initWithDict(node)
137                 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
138                 if not self.found.has_key(n.id):
139                     self.found[n.id] = n
140         elif dict.has_key('values'):
141             def x(y, z=self.results):
142                 if not z.has_key(y):
143                     z[y] = 1
144                     return y
145                 else:
146                     return None
147             z = len(dict['values'])
148             v = filter(None, map(x, dict['values']))
149             if(len(v)):
150                 reactor.callLater(0, self.callback, self.target, v)
151         self.schedule()
152         
153     ## get value
154     def schedule(self):
155         if self.finished:
156             return
157         l = self.found.values()
158         l.sort(self.sort)
159         
160         for node in l[:self.config['K']]:
161             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
162                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
163                 try:
164                     f = getattr(node, self.findValue)
165                 except AttributeError:
166                     print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
167                 else:
168                     df = f(self.target, self.table.node.id)
169                     df.addCallback(self.handleGotNodes)
170                     df.addErrback(self.makeMsgFailed(node))
171                     self.outstanding = self.outstanding + 1
172                     self.queried[node.id] = 1
173             if self.outstanding >= self.config['CONCURRENT_REQS']:
174                 break
175         assert(self.outstanding) >=0
176         if self.outstanding == 0:
177             ## all done, didn't find it!!
178             self.finished=1
179             reactor.callLater(0, self.callback, self.target, [])
180
181     ## get value
182     def goWithNodes(self, nodes, found=None):
183         self.results = {}
184         if found:
185             for n in found:
186                 self.results[n] = 1
187         for node in nodes:
188             if node.id == self.table.node.id:
189                 continue
190             else:
191                 self.found[node.id] = node
192             
193         self.schedule()
194
195
196 class StoreValue(ActionBase):
197     def __init__(self, table, target, value, callback, config, store="storeValue"):
198         ActionBase.__init__(self, table, target, callback, config)
199         self.value = value
200         self.stored = []
201         self.store = store
202         
203     def storedValue(self, t, node):
204         self.outstanding -= 1
205         self.table.insertNode(node)
206         if self.finished:
207             return
208         self.stored.append(t)
209         if len(self.stored) >= self.config['STORE_REDUNDANCY']:
210             self.finished=1
211             self.callback(self.target, self.value, self.stored)
212         else:
213             if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
214                 self.schedule()
215         return t
216     
217     def storeFailed(self, t, node):
218         print ">>> store failed %s/%s" % (node.host, node.port)
219         self.table.nodeFailed(node)
220         self.outstanding -= 1
221         if self.finished:
222             return t
223         self.schedule()
224         return t
225     
226     def schedule(self):
227         if self.finished:
228             return
229         num = self.config['CONCURRENT_REQS'] - self.outstanding
230         if num > self.config['STORE_REDUNDANCY']:
231             num = self.config['STORE_REDUNDANCY']
232         for i in range(num):
233             try:
234                 node = self.nodes.pop()
235             except IndexError:
236                 if self.outstanding == 0:
237                     self.finished = 1
238                     self.callback(self.target, self.value, self.stored)
239             else:
240                 if not node.id == self.table.node.id:
241                     self.outstanding += 1
242                     try:
243                         f = getattr(node, self.store)
244                     except AttributeError:
245                         print ">>> %s doesn't have a %s method!" % (node, self.store)
246                     else:
247                         df = f(self.target, self.value, self.table.node.id)
248                         df.addCallback(self.storedValue, node=node)
249                         df.addErrback(self.storeFailed, node=node)
250                     
251     def goWithNodes(self, nodes):
252         self.nodes = nodes
253         self.nodes.sort(self.sort)
254         self.schedule()
255
256
257 class KeyExpirer:
258     def __init__(self, store, config):
259         self.store = store
260         self.config = config
261         self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
262     
263     def doExpire(self):
264         self.store.expireValues(self.config['KE_AGE'])
265         self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
266         
267     def shutdown(self):
268         try:
269             self.next_expire.cancel()
270         except:
271             pass