]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - actions.py
major cleanup, updated for twisted
[quix0rs-apt-p2p.git] / actions.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from time import time
5
6 from const import reactor
7 import const
8
9 from khash import intify
10 from knode import KNode as Node
11 from ktable import KTable, K
12
13 class ActionBase:
14     """ base class for some long running asynchronous proccesses like finding nodes or values """
15     def __init__(self, table, target, callback):
16         self.table = table
17         self.target = target
18         self.num = intify(target)
19         self.found = {}
20         self.queried = {}
21         self.answered = {}
22         self.callback = callback
23         self.outstanding = 0
24         self.finished = 0
25     
26         def sort(a, b, num=self.num):
27             """ this function is for sorting nodes relative to the ID we are looking for """
28             x, y = num ^ a.num, num ^ b.num
29             if x > y:
30                 return 1
31             elif x < y:
32                 return -1
33             return 0
34         self.sort = sort
35         
36     def goWithNodes(self, t):
37         pass
38     
39     
40
41 FIND_NODE_TIMEOUT = 15
42
43 class FindNode(ActionBase):
44     """ find node action merits it's own class as it is a long running stateful process """
45     def handleGotNodes(self, dict):
46         _krpc_sender = dict['_krpc_sender']
47         dict = dict['rsp']
48         l = dict["nodes"]
49         sender = dict["sender"]
50         sender['port'] = _krpc_sender[1]        
51         sender = Node().initWithDict(sender)
52         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
53         self.table.table.insertNode(sender)
54         if self.finished or self.answered.has_key(sender.id):
55             # a day late and a dollar short
56             return
57         self.outstanding = self.outstanding - 1
58         self.answered[sender.id] = 1
59         for node in l:
60             n = Node().initWithDict(node)
61             n.conn = self.table.udp.connectionForAddr((n.host, n.port))
62             if not self.found.has_key(n.id):
63                 self.found[n.id] = n
64         self.schedule()
65         
66     def schedule(self):
67         """
68             send messages to new peers, if necessary
69         """
70         if self.finished:
71             return
72         l = self.found.values()
73         l.sort(self.sort)
74         for node in l[:K]:
75             if node.id == self.target:
76                 self.finished=1
77                 return self.callback([node])
78             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
79                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
80                 df = node.findNode(self.target, self.table.node.senderDict())
81                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
82                 self.outstanding = self.outstanding + 1
83                 self.queried[node.id] = 1
84             if self.outstanding >= const.CONCURRENT_REQS:
85                 break
86         assert(self.outstanding) >=0
87         if self.outstanding == 0:
88             ## all done!!
89             self.finished=1
90             reactor.callFromThread(self.callback, l[:K])
91     
92     def makeMsgFailed(self, node):
93         def defaultGotNodes(err, self=self, node=node):
94             print ">>> find failed %s/%s" % (node.host, node.port)
95             self.table.table.nodeFailed(node)
96             self.outstanding = self.outstanding - 1
97             self.schedule()
98         return defaultGotNodes
99     
100     def goWithNodes(self, nodes):
101         """
102             this starts the process, our argument is a transaction with t.extras being our list of nodes
103             it's a transaction since we got called from the dispatcher
104         """
105         for node in nodes:
106             if node.id == self.table.node.id:
107                 continue
108             else:
109                 self.found[node.id] = node
110         
111         self.schedule()
112     
113
114 GET_VALUE_TIMEOUT = 15
115 class GetValue(FindNode):
116     """ get value task """
117     def handleGotNodes(self, dict):
118         _krpc_sender = dict['_krpc_sender']
119         dict = dict['rsp']
120         sender = dict["sender"]
121         sender['port'] = _krpc_sender[1]        
122         sender = Node().initWithDict(sender)
123         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
124         self.table.table.insertNode(sender)
125         if self.finished or self.answered.has_key(sender.id):
126             # a day late and a dollar short
127             return
128         self.outstanding = self.outstanding - 1
129         self.answered[sender.id] = 1
130         # go through nodes
131         # if we have any closer than what we already got, query them
132         if dict.has_key('nodes'):
133             for node in dict['nodes']:
134                 n = Node().initWithDict(node)
135                 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
136                 if not self.found.has_key(n.id):
137                     self.found[n.id] = n
138         elif dict.has_key('values'):
139             def x(y, z=self.results):
140                 if not z.has_key(y):
141                     z[y] = 1
142                     return y
143                 else:
144                     return None
145             z = len(dict['values'])
146             v = filter(None, map(x, dict['values']))
147             if(len(v)):
148                 reactor.callFromThread(self.callback, v)
149         self.schedule()
150         
151     ## get value
152     def schedule(self):
153         if self.finished:
154             return
155         l = self.found.values()
156         l.sort(self.sort)
157         
158         for node in l[:K]:
159             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
160                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
161                 df = node.findValue(self.target, self.table.node.senderDict())
162                 df.addCallback(self.handleGotNodes)
163                 df.addErrback(self.makeMsgFailed(node))
164                 self.outstanding = self.outstanding + 1
165                 self.queried[node.id] = 1
166             if self.outstanding >= const.CONCURRENT_REQS:
167                 break
168         assert(self.outstanding) >=0
169         if self.outstanding == 0:
170             ## all done, didn't find it!!
171             self.finished=1
172             reactor.callFromThread(self.callback,[])
173
174     ## get value
175     def goWithNodes(self, nodes, found=None):
176         self.results = {}
177         if found:
178             for n in found:
179                 self.results[n] = 1
180         for node in nodes:
181             if node.id == self.table.node.id:
182                 continue
183             else:
184                 self.found[node.id] = node
185             
186         self.schedule()
187
188
189 class StoreValue(ActionBase):
190     def __init__(self, table, target, value, callback):
191         ActionBase.__init__(self, table, target, callback)
192         self.value = value
193         self.stored = []
194     
195     def storedValue(self, t, node):
196         self.outstanding -= 1
197         self.table.insertNode(node)
198         if self.finished:
199             return
200         self.stored.append(t)
201         if len(self.stored) >= const.STORE_REDUNDANCY:
202             self.finished=1
203             self.callback(self.stored)
204         else:
205             if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
206                 self.schedule()
207             
208     def storeFailed(self, t, node):
209         print ">>> store failed %s/%s" % (node.host, node.port)
210         self.table.nodeFailed(node)
211         self.outstanding -= 1
212         if self.finished:
213             return
214         self.schedule()
215         
216     def schedule(self):
217         if self.finished:
218             return
219         num = const.CONCURRENT_REQS - self.outstanding
220         if num > const.STORE_REDUNDANCY:
221             num = const.STORE_REDUNDANCY
222         for i in range(num):
223             try:
224                 node = self.nodes.pop()
225             except IndexError:
226                 if self.outstanding == 0:
227                     self.finished = 1
228                     self.callback(self.stored)
229             else:
230                 if not node.id == self.table.node.id:
231                     self.outstanding += 1
232                     df = node.storeValue(self.target, self.value, self.table.node.senderDict())
233                     df.addCallback(self.storedValue, node=node)
234                     df.addErrback(self.storeFailed, node=node)
235                     
236     def goWithNodes(self, nodes):
237         self.nodes = nodes
238         self.nodes.sort(self.sort)
239         self.schedule()
240
241
242 class KeyExpirer:
243     def __init__(self, store):
244         self.store = store
245         reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
246     
247     def doExpire(self):
248         self.cut = "%0.6f" % (time() - const.KE_AGE)
249         self._expire()
250     
251     def _expire(self):
252         c = self.store.cursor()
253         s = "delete from kv where time < '%s';" % self.cut
254         c.execute(s)
255         reactor.callLater(const.KE_DELAY, self.doExpire)