]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - actions.py
f5d3a9fca6d1b7c12ed01ffa85c945791175d3aa
[quix0rs-apt-p2p.git] / actions.py
1 from time import time
2
3 from const import reactor
4 import const
5
6 from hash import intify
7 from knode import KNode as Node
8 from ktable import KTable, K
9
10 class ActionBase:
11     """ base class for some long running asynchronous proccesses like finding nodes or values """
12     def __init__(self, table, target, callback):
13         self.table = table
14         self.target = target
15         self.num = intify(target)
16         self.found = {}
17         self.queried = {}
18         self.answered = {}
19         self.callback = callback
20         self.outstanding = 0
21         self.finished = 0
22     
23         def sort(a, b, num=self.num):
24             """ this function is for sorting nodes relative to the ID we are looking for """
25             x, y = num ^ a.num, num ^ b.num
26             if x > y:
27                 return 1
28             elif x < y:
29                 return -1
30             return 0
31         self.sort = sort
32         
33     def goWithNodes(self, t):
34         pass
35     
36     
37
38 FIND_NODE_TIMEOUT = 15
39
40 class FindNode(ActionBase):
41     """ find node action merits it's own class as it is a long running stateful process """
42     def handleGotNodes(self, dict):
43         _krpc_sender = dict['_krpc_sender']
44         dict = dict['rsp']
45         l = dict["nodes"]
46         sender = dict["sender"]
47         sender['port'] = _krpc_sender[1]        
48         sender = Node().initWithDict(sender)
49         sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
50         self.table.table.insertNode(sender)
51         if self.finished or self.answered.has_key(sender.id):
52             # a day late and a dollar short
53             return
54         self.outstanding = self.outstanding - 1
55         self.answered[sender.id] = 1
56         for node in l:
57             n = Node().initWithDict(node)
58             n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
59             if not self.found.has_key(n.id):
60                 self.found[n.id] = n
61         self.schedule()
62         
63     def schedule(self):
64         """
65             send messages to new peers, if necessary
66         """
67         if self.finished:
68             return
69         l = self.found.values()
70         l.sort(self.sort)
71         for node in l[:K]:
72             if node.id == self.target:
73                 self.finished=1
74                 return self.callback([node])
75             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
76                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
77                 df = node.findNode(self.target, self.table.node.senderDict())
78                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
79                 self.outstanding = self.outstanding + 1
80                 self.queried[node.id] = 1
81             if self.outstanding >= const.CONCURRENT_REQS:
82                 break
83         assert(self.outstanding) >=0
84         if self.outstanding == 0:
85             ## all done!!
86             self.finished=1
87             reactor.callFromThread(self.callback, l[:K])
88     
89     def makeMsgFailed(self, node):
90         def defaultGotNodes(err, self=self, node=node):
91             print ">>> find failed %s/%s" % (node.host, node.port)
92             self.table.table.nodeFailed(node)
93             self.outstanding = self.outstanding - 1
94             self.schedule()
95         return defaultGotNodes
96     
97     def goWithNodes(self, nodes):
98         """
99             this starts the process, our argument is a transaction with t.extras being our list of nodes
100             it's a transaction since we got called from the dispatcher
101         """
102         for node in nodes:
103             if node.id == self.table.node.id:
104                 continue
105             else:
106                 self.found[node.id] = node
107         
108         self.schedule()
109     
110
111 GET_VALUE_TIMEOUT = 15
112 class GetValue(FindNode):
113     """ get value task """
114     def handleGotNodes(self, dict):
115         _krpc_sender = dict['_krpc_sender']
116         dict = dict['rsp']
117         sender = dict["sender"]
118         sender['port'] = _krpc_sender[1]        
119         sender = Node().initWithDict(sender)
120         sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
121         self.table.table.insertNode(sender)
122         if self.finished or self.answered.has_key(sender.id):
123             # a day late and a dollar short
124             return
125         self.outstanding = self.outstanding - 1
126         self.answered[sender.id] = 1
127         # go through nodes
128         # if we have any closer than what we already got, query them
129         if dict.has_key('nodes'):
130             for node in dict['nodes']:
131                 n = Node().initWithDict(node)
132                 n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
133                 if not self.found.has_key(n.id):
134                     self.found[n.id] = n
135         elif dict.has_key('values'):
136             def x(y, z=self.results):
137                 if not z.has_key(y):
138                     z[y] = 1
139                     return y
140                 else:
141                     return None
142             z = len(dict['values'])
143             v = filter(None, map(x, dict['values']))
144             if(len(v)):
145                 reactor.callFromThread(self.callback, v)
146         self.schedule()
147         
148     ## get value
149     def schedule(self):
150         if self.finished:
151             return
152         l = self.found.values()
153         l.sort(self.sort)
154         
155         for node in l[:K]:
156             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
157                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
158                 df = node.findValue(self.target, self.table.node.senderDict())
159                 df.addCallback(self.handleGotNodes)
160                 df.addErrback(self.makeMsgFailed(node))
161                 self.outstanding = self.outstanding + 1
162                 self.queried[node.id] = 1
163             if self.outstanding >= const.CONCURRENT_REQS:
164                 break
165         assert(self.outstanding) >=0
166         if self.outstanding == 0:
167             ## all done, didn't find it!!
168             self.finished=1
169             reactor.callFromThread(self.callback,[])
170
171     ## get value
172     def goWithNodes(self, nodes, found=None):
173         self.results = {}
174         if found:
175             for n in found:
176                 self.results[n] = 1
177         for node in nodes:
178             if node.id == self.table.node.id:
179                 continue
180             else:
181                 self.found[node.id] = node
182             
183         self.schedule()
184
185
186 class StoreValue(ActionBase):
187     def __init__(self, table, target, value, callback):
188         ActionBase.__init__(self, table, target, callback)
189         self.value = value
190         self.stored = []
191     
192     def storedValue(self, t, node):
193         self.outstanding -= 1
194         self.table.insertNode(node)
195         if self.finished:
196             return
197         self.stored.append(t)
198         if len(self.stored) >= const.STORE_REDUNDANCY:
199             self.finished=1
200             self.callback(self.stored)
201         else:
202             if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
203                 self.schedule()
204             
205     def storeFailed(self, t, node):
206         print ">>> store failed %s/%s" % (node.host, node.port)
207         self.table.nodeFailed(node)
208         self.outstanding -= 1
209         if self.finished:
210             return
211         self.schedule()
212         
213     def schedule(self):
214         if self.finished:
215             return
216         num = const.CONCURRENT_REQS - self.outstanding
217         if num > const.STORE_REDUNDANCY:
218             num = const.STORE_REDUNDANCY
219         for i in range(num):
220             try:
221                 node = self.nodes.pop()
222             except IndexError:
223                 if self.outstanding == 0:
224                     self.finished = 1
225                     self.callback(self.stored)
226             else:
227                 if not node.id == self.table.node.id:
228                     self.outstanding += 1
229                     df = node.storeValue(self.target, self.value, self.table.node.senderDict())
230                     df.addCallback(self.storedValue, node=node)
231                     df.addErrback(self.storeFailed, node=node)
232                     
233     def goWithNodes(self, nodes):
234         self.nodes = nodes
235         self.nodes.sort(self.sort)
236         self.schedule()
237
238
239 class KeyExpirer:
240     def __init__(self, store):
241         self.store = store
242         reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
243     
244     def doExpire(self):
245         self.cut = "%0.6f" % (time() - const.KE_AGE)
246         self._expire()
247     
248     def _expire(self):
249         c = self.store.cursor()
250         s = "delete from kv where time < '%s';" % self.cut
251         c.execute(s)
252         reactor.callLater(const.KE_DELAY, self.doExpire)