Remove all the airhook stuff.
[quix0rs-apt-p2p.git] / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from time import time
5
6 from const import reactor
7 import const
8
9 from khash import intify
10 from ktable import KTable, K
11
12 class ActionBase:
13     """ base class for some long running asynchronous proccesses like finding nodes or values """
14     def __init__(self, table, target, callback):
15         self.table = table
16         self.target = target
17         self.num = intify(target)
18         self.found = {}
19         self.queried = {}
20         self.answered = {}
21         self.callback = callback
22         self.outstanding = 0
23         self.finished = 0
24     
25         def sort(a, b, num=self.num):
26             """ this function is for sorting nodes relative to the ID we are looking for """
27             x, y = num ^ a.num, num ^ b.num
28             if x > y:
29                 return 1
30             elif x < y:
31                 return -1
32             return 0
33         self.sort = sort
34         
35     def goWithNodes(self, t):
36         pass
37     
38     
39
40 FIND_NODE_TIMEOUT = 15
41
42 class FindNode(ActionBase):
43     """ find node action merits it's own class as it is a long running stateful process """
44     def handleGotNodes(self, dict):
45         _krpc_sender = dict['_krpc_sender']
46         dict = dict['rsp']
47         l = dict["nodes"]
48         sender = {'id' : dict["id"]}
49         sender['port'] = _krpc_sender[1]        
50         sender['host'] = _krpc_sender[0]        
51         sender = self.table.Node().initWithDict(sender)
52         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
53         self.table.table.insertNode(sender)
54         if self.finished or self.answered.has_key(sender.id):
55             # a day late and a dollar short
56             return
57         self.outstanding = self.outstanding - 1
58         self.answered[sender.id] = 1
59         for node in l:
60             n = self.table.Node().initWithDict(node)
61             n.conn = self.table.udp.connectionForAddr((n.host, n.port))
62             if not self.found.has_key(n.id):
63                 self.found[n.id] = n
64         self.schedule()
65         
66     def schedule(self):
67         """
68             send messages to new peers, if necessary
69         """
70         if self.finished:
71             return
72         l = self.found.values()
73         l.sort(self.sort)
74         for node in l[:K]:
75             if node.id == self.target:
76                 self.finished=1
77                 return self.callback([node])
78             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
79                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
80                 df = node.findNode(self.target, self.table.node.id)
81                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
82                 self.outstanding = self.outstanding + 1
83                 self.queried[node.id] = 1
84             if self.outstanding >= const.CONCURRENT_REQS:
85                 break
86         assert(self.outstanding) >=0
87         if self.outstanding == 0:
88             ## all done!!
89             self.finished=1
90             reactor.callFromThread(self.callback, l[:K])
91     
92     def makeMsgFailed(self, node):
93         def defaultGotNodes(err, self=self, node=node):
94             print ">>> find failed %s/%s" % (node.host, node.port), err
95             self.table.table.nodeFailed(node)
96             self.outstanding = self.outstanding - 1
97             self.schedule()
98         return defaultGotNodes
99     
100     def goWithNodes(self, nodes):
101         """
102             this starts the process, our argument is a transaction with t.extras being our list of nodes
103             it's a transaction since we got called from the dispatcher
104         """
105         for node in nodes:
106             if node.id == self.table.node.id:
107                 continue
108             else:
109                 self.found[node.id] = node
110         
111         self.schedule()
112     
113
114 get_value_timeout = 15
115 class GetValue(FindNode):
116     def __init__(self, table, target, callback, find="findValue"):
117         FindNode.__init__(self, table, target, callback)
118         self.findValue = find
119             
120     """ get value task """
121     def handleGotNodes(self, dict):
122         _krpc_sender = dict['_krpc_sender']
123         dict = dict['rsp']
124         sender = {'id' : dict["id"]}
125         sender['port'] = _krpc_sender[1]
126         sender['host'] = _krpc_sender[0]                
127         sender = self.table.Node().initWithDict(sender)
128         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
129         self.table.table.insertNode(sender)
130         if self.finished or self.answered.has_key(sender.id):
131             # a day late and a dollar short
132             return
133         self.outstanding = self.outstanding - 1
134         self.answered[sender.id] = 1
135         # go through nodes
136         # if we have any closer than what we already got, query them
137         if dict.has_key('nodes'):
138             for node in dict['nodes']:
139                 n = self.table.Node().initWithDict(node)
140                 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
141                 if not self.found.has_key(n.id):
142                     self.found[n.id] = n
143         elif dict.has_key('values'):
144             def x(y, z=self.results):
145                 if not z.has_key(y):
146                     z[y] = 1
147                     return y
148                 else:
149                     return None
150             z = len(dict['values'])
151             v = filter(None, map(x, dict['values']))
152             if(len(v)):
153                 reactor.callFromThread(self.callback, v)
154         self.schedule()
155         
156     ## get value
157     def schedule(self):
158         if self.finished:
159             return
160         l = self.found.values()
161         l.sort(self.sort)
162         
163         for node in l[:K]:
164             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
165                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
166                 try:
167                     f = getattr(node, self.findValue)
168                 except AttributeError:
169                     print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
170                 else:
171                     df = f(self.target, self.table.node.id)
172                     df.addCallback(self.handleGotNodes)
173                     df.addErrback(self.makeMsgFailed(node))
174                     self.outstanding = self.outstanding + 1
175                     self.queried[node.id] = 1
176             if self.outstanding >= const.CONCURRENT_REQS:
177                 break
178         assert(self.outstanding) >=0
179         if self.outstanding == 0:
180             ## all done, didn't find it!!
181             self.finished=1
182             reactor.callFromThread(self.callback,[])
183
184     ## get value
185     def goWithNodes(self, nodes, found=None):
186         self.results = {}
187         if found:
188             for n in found:
189                 self.results[n] = 1
190         for node in nodes:
191             if node.id == self.table.node.id:
192                 continue
193             else:
194                 self.found[node.id] = node
195             
196         self.schedule()
197
198
199 class StoreValue(ActionBase):
200     def __init__(self, table, target, value, callback, store="storeValue"):
201         ActionBase.__init__(self, table, target, callback)
202         self.value = value
203         self.stored = []
204         self.store = store
205         
206     def storedValue(self, t, node):
207         self.outstanding -= 1
208         self.table.insertNode(node)
209         if self.finished:
210             return
211         self.stored.append(t)
212         if len(self.stored) >= const.STORE_REDUNDANCY:
213             self.finished=1
214             self.callback(self.stored)
215         else:
216             if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
217                 self.schedule()
218         return t
219     
220     def storeFailed(self, t, node):
221         print ">>> store failed %s/%s" % (node.host, node.port)
222         self.table.nodeFailed(node)
223         self.outstanding -= 1
224         if self.finished:
225             return t
226         self.schedule()
227         return t
228     
229     def schedule(self):
230         if self.finished:
231             return
232         num = const.CONCURRENT_REQS - self.outstanding
233         if num > const.STORE_REDUNDANCY:
234             num = const.STORE_REDUNDANCY
235         for i in range(num):
236             try:
237                 node = self.nodes.pop()
238             except IndexError:
239                 if self.outstanding == 0:
240                     self.finished = 1
241                     self.callback(self.stored)
242             else:
243                 if not node.id == self.table.node.id:
244                     self.outstanding += 1
245                     try:
246                         f = getattr(node, self.store)
247                     except AttributeError:
248                         print ">>> %s doesn't have a %s method!" % (node, self.store)
249                     else:
250                         df = f(self.target, self.value, self.table.node.id)
251                         df.addCallback(self.storedValue, node=node)
252                         df.addErrback(self.storeFailed, node=node)
253                     
254     def goWithNodes(self, nodes):
255         self.nodes = nodes
256         self.nodes.sort(self.sort)
257         self.schedule()
258
259
260 class KeyExpirer:
261     def __init__(self, store):
262         self.store = store
263         reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
264     
265     def doExpire(self):
266         self.cut = "%0.6f" % (time() - const.KE_AGE)
267         self._expire()
268     
269     def _expire(self):
270         c = self.store.cursor()
271         s = "delete from kv where time < '%s';" % self.cut
272         c.execute(s)
273         reactor.callLater(const.KE_DELAY, self.doExpire)