]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - actions.py
only send the ID along in khashmir messages, don't send the host and
[quix0rs-apt-p2p.git] / actions.py
1 ## Copyright 2002-2003 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from time import time
5
6 from const import reactor
7 import const
8
9 from khash import intify
10 from knode import KNode as Node
11 from ktable import KTable, K
12
13 class ActionBase:
14     """ base class for some long running asynchronous proccesses like finding nodes or values """
15     def __init__(self, table, target, callback):
16         self.table = table
17         self.target = target
18         self.num = intify(target)
19         self.found = {}
20         self.queried = {}
21         self.answered = {}
22         self.callback = callback
23         self.outstanding = 0
24         self.finished = 0
25     
26         def sort(a, b, num=self.num):
27             """ this function is for sorting nodes relative to the ID we are looking for """
28             x, y = num ^ a.num, num ^ b.num
29             if x > y:
30                 return 1
31             elif x < y:
32                 return -1
33             return 0
34         self.sort = sort
35         
36     def goWithNodes(self, t):
37         pass
38     
39     
40
41 FIND_NODE_TIMEOUT = 15
42
43 class FindNode(ActionBase):
44     """ find node action merits it's own class as it is a long running stateful process """
45     def handleGotNodes(self, dict):
46         _krpc_sender = dict['_krpc_sender']
47         dict = dict['rsp']
48         l = dict["nodes"]
49         sender = {'id' : dict["id"]}
50         sender['port'] = _krpc_sender[1]        
51         sender['host'] = _krpc_sender[0]        
52         sender = Node().initWithDict(sender)
53         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
54         self.table.table.insertNode(sender)
55         if self.finished or self.answered.has_key(sender.id):
56             # a day late and a dollar short
57             return
58         self.outstanding = self.outstanding - 1
59         self.answered[sender.id] = 1
60         for node in l:
61             n = Node().initWithDict(node)
62             n.conn = self.table.udp.connectionForAddr((n.host, n.port))
63             if not self.found.has_key(n.id):
64                 self.found[n.id] = n
65         self.schedule()
66         
67     def schedule(self):
68         """
69             send messages to new peers, if necessary
70         """
71         if self.finished:
72             return
73         l = self.found.values()
74         l.sort(self.sort)
75         for node in l[:K]:
76             if node.id == self.target:
77                 self.finished=1
78                 return self.callback([node])
79             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
80                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
81                 df = node.findNode(self.target, self.table.node.id)
82                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
83                 self.outstanding = self.outstanding + 1
84                 self.queried[node.id] = 1
85             if self.outstanding >= const.CONCURRENT_REQS:
86                 break
87         assert(self.outstanding) >=0
88         if self.outstanding == 0:
89             ## all done!!
90             self.finished=1
91             reactor.callFromThread(self.callback, l[:K])
92     
93     def makeMsgFailed(self, node):
94         def defaultGotNodes(err, self=self, node=node):
95             print ">>> find failed %s/%s" % (node.host, node.port)
96             self.table.table.nodeFailed(node)
97             self.outstanding = self.outstanding - 1
98             self.schedule()
99         return defaultGotNodes
100     
101     def goWithNodes(self, nodes):
102         """
103             this starts the process, our argument is a transaction with t.extras being our list of nodes
104             it's a transaction since we got called from the dispatcher
105         """
106         for node in nodes:
107             if node.id == self.table.node.id:
108                 continue
109             else:
110                 self.found[node.id] = node
111         
112         self.schedule()
113     
114
115 GET_VALUE_TIMEOUT = 15
116 class GetValue(FindNode):
117     """ get value task """
118     def handleGotNodes(self, dict):
119         _krpc_sender = dict['_krpc_sender']
120         dict = dict['rsp']
121         sender = {'id' : dict["id"]}
122         sender['port'] = _krpc_sender[1]
123         sender['host'] = _krpc_sender[0]                
124         sender = Node().initWithDict(sender)
125         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
126         self.table.table.insertNode(sender)
127         if self.finished or self.answered.has_key(sender.id):
128             # a day late and a dollar short
129             return
130         self.outstanding = self.outstanding - 1
131         self.answered[sender.id] = 1
132         # go through nodes
133         # if we have any closer than what we already got, query them
134         if dict.has_key('nodes'):
135             for node in dict['nodes']:
136                 n = Node().initWithDict(node)
137                 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
138                 if not self.found.has_key(n.id):
139                     self.found[n.id] = n
140         elif dict.has_key('values'):
141             def x(y, z=self.results):
142                 if not z.has_key(y):
143                     z[y] = 1
144                     return y
145                 else:
146                     return None
147             z = len(dict['values'])
148             v = filter(None, map(x, dict['values']))
149             if(len(v)):
150                 reactor.callFromThread(self.callback, v)
151         self.schedule()
152         
153     ## get value
154     def schedule(self):
155         if self.finished:
156             return
157         l = self.found.values()
158         l.sort(self.sort)
159         
160         for node in l[:K]:
161             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
162                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
163                 df = node.findValue(self.target, self.table.node.id)
164                 df.addCallback(self.handleGotNodes)
165                 df.addErrback(self.makeMsgFailed(node))
166                 self.outstanding = self.outstanding + 1
167                 self.queried[node.id] = 1
168             if self.outstanding >= const.CONCURRENT_REQS:
169                 break
170         assert(self.outstanding) >=0
171         if self.outstanding == 0:
172             ## all done, didn't find it!!
173             self.finished=1
174             reactor.callFromThread(self.callback,[])
175
176     ## get value
177     def goWithNodes(self, nodes, found=None):
178         self.results = {}
179         if found:
180             for n in found:
181                 self.results[n] = 1
182         for node in nodes:
183             if node.id == self.table.node.id:
184                 continue
185             else:
186                 self.found[node.id] = node
187             
188         self.schedule()
189
190
191 class StoreValue(ActionBase):
192     def __init__(self, table, target, value, callback):
193         ActionBase.__init__(self, table, target, callback)
194         self.value = value
195         self.stored = []
196     
197     def storedValue(self, t, node):
198         self.outstanding -= 1
199         self.table.insertNode(node)
200         if self.finished:
201             return
202         self.stored.append(t)
203         if len(self.stored) >= const.STORE_REDUNDANCY:
204             self.finished=1
205             self.callback(self.stored)
206         else:
207             if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
208                 self.schedule()
209             
210     def storeFailed(self, t, node):
211         print ">>> store failed %s/%s" % (node.host, node.port)
212         self.table.nodeFailed(node)
213         self.outstanding -= 1
214         if self.finished:
215             return
216         self.schedule()
217         
218     def schedule(self):
219         if self.finished:
220             return
221         num = const.CONCURRENT_REQS - self.outstanding
222         if num > const.STORE_REDUNDANCY:
223             num = const.STORE_REDUNDANCY
224         for i in range(num):
225             try:
226                 node = self.nodes.pop()
227             except IndexError:
228                 if self.outstanding == 0:
229                     self.finished = 1
230                     self.callback(self.stored)
231             else:
232                 if not node.id == self.table.node.id:
233                     self.outstanding += 1
234                     df = node.storeValue(self.target, self.value, self.table.node.id)
235                     df.addCallback(self.storedValue, node=node)
236                     df.addErrback(self.storeFailed, node=node)
237                     
238     def goWithNodes(self, nodes):
239         self.nodes = nodes
240         self.nodes.sort(self.sort)
241         self.schedule()
242
243
244 class KeyExpirer:
245     def __init__(self, store):
246         self.store = store
247         reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
248     
249     def doExpire(self):
250         self.cut = "%0.6f" % (time() - const.KE_AGE)
251         self._expire()
252     
253     def _expire(self):
254         c = self.store.cursor()
255         s = "delete from kv where time < '%s';" % self.cut
256         c.execute(s)
257         reactor.callLater(const.KE_DELAY, self.doExpire)