ripped out xmlrpc, experimented with xmlrpc but with bencode, finally
[quix0rs-apt-p2p.git] / actions.py
1 from time import time
2
3 from const import reactor
4 import const
5
6 from hash import intify
7 from knode import KNode as Node
8 from ktable import KTable, K
9
10 class ActionBase:
11     """ base class for some long running asynchronous proccesses like finding nodes or values """
12     def __init__(self, table, target, callback):
13         self.table = table
14         self.target = target
15         self.num = intify(target)
16         self.found = {}
17         self.queried = {}
18         self.answered = {}
19         self.callback = callback
20         self.outstanding = 0
21         self.finished = 0
22     
23         def sort(a, b, num=self.num):
24             """ this function is for sorting nodes relative to the ID we are looking for """
25             x, y = num ^ a.num, num ^ b.num
26             if x > y:
27                 return 1
28             elif x < y:
29                 return -1
30             return 0
31         self.sort = sort
32         
33     def goWithNodes(self, t):
34         pass
35     
36     
37
38 FIND_NODE_TIMEOUT = 15
39
40 class FindNode(ActionBase):
41     """ find node action merits it's own class as it is a long running stateful process """
42     def handleGotNodes(self, dict):
43         l = dict["nodes"]
44         sender = dict["sender"]
45         sender = Node().initWithDict(sender)
46         sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
47         self.table.table.insertNode(sender)
48         if self.finished or self.answered.has_key(sender.id):
49             # a day late and a dollar short
50             return
51         self.outstanding = self.outstanding - 1
52         self.answered[sender.id] = 1
53         for node in l:
54             n = Node().initWithDict(node)
55             n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
56             if not self.found.has_key(n.id):
57                 self.found[n.id] = n
58         self.schedule()
59         
60     def schedule(self):
61         """
62             send messages to new peers, if necessary
63         """
64         if self.finished:
65             return
66         l = self.found.values()
67         l.sort(self.sort)
68         for node in l[:K]:
69             if node.id == self.target:
70                 self.finished=1
71                 return self.callback([node])
72             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
73                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
74                 df = node.findNode(self.target, self.table.node.senderDict())
75                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
76                 self.outstanding = self.outstanding + 1
77                 self.queried[node.id] = 1
78             if self.outstanding >= const.CONCURRENT_REQS:
79                 break
80         assert(self.outstanding) >=0
81         if self.outstanding == 0:
82             ## all done!!
83             self.finished=1
84             reactor.callFromThread(self.callback, l[:K])
85     
86     def makeMsgFailed(self, node):
87         def defaultGotNodes(err, self=self, node=node):
88             self.table.table.nodeFailed(node)
89             self.outstanding = self.outstanding - 1
90             self.schedule()
91         return defaultGotNodes
92     
93     def goWithNodes(self, nodes):
94         """
95             this starts the process, our argument is a transaction with t.extras being our list of nodes
96             it's a transaction since we got called from the dispatcher
97         """
98         for node in nodes:
99             if node.id == self.table.node.id:
100                 continue
101             else:
102                 self.found[node.id] = node
103         
104         self.schedule()
105     
106
107 GET_VALUE_TIMEOUT = 15
108 class GetValue(FindNode):
109     """ get value task """
110     def handleGotNodes(self, dict):
111         sender = dict["sender"]
112         sender = Node().initWithDict(sender)
113         sender.conn = self.table.airhook.connectionForAddr((sender.host, sender.port))
114         self.table.table.insertNode(sender)
115         if self.finished or self.answered.has_key(sender.id):
116             # a day late and a dollar short
117             return
118         self.outstanding = self.outstanding - 1
119         self.answered[sender.id] = 1
120         # go through nodes
121         # if we have any closer than what we already got, query them
122         if dict.has_key('nodes'):
123             for node in dict['nodes']:
124                 n = Node().initWithDict(node)
125                 n.conn = self.table.airhook.connectionForAddr((n.host, n.port))
126                 if not self.found.has_key(n.id):
127                     self.found[n.id] = n
128         elif dict.has_key('values'):
129             def x(y, z=self.results):
130                 if not z.has_key(y):
131                     z[y] = 1
132                     return y
133                 else:
134                     return None
135             v = filter(None, map(x, dict['values']))
136             if(len(v)):
137                 reactor.callFromThread(self.callback, v)
138         self.schedule()
139         
140     ## get value
141     def schedule(self):
142         if self.finished:
143             return
144         l = self.found.values()
145         l.sort(self.sort)
146         
147         for node in l[:K]:
148             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
149                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
150                 df = node.findValue(self.target, self.table.node.senderDict())
151                 df.addCallback(self.handleGotNodes)
152                 df.addErrback(self.makeMsgFailed(node))
153                 self.outstanding = self.outstanding + 1
154                 self.queried[node.id] = 1
155             if self.outstanding >= const.CONCURRENT_REQS:
156                 break
157         assert(self.outstanding) >=0
158         if self.outstanding == 0:
159             ## all done, didn't find it!!
160             self.finished=1
161             reactor.callFromThread(self.callback,[])
162
163     ## get value
164     def goWithNodes(self, nodes, found=None):
165         self.results = {}
166         if found:
167             for n in found:
168                 self.results[n] = 1
169         for node in nodes:
170             if node.id == self.table.node.id:
171                 continue
172             else:
173                 self.found[node.id] = node
174             
175         self.schedule()
176
177
178 class StoreValue(ActionBase):
179     def __init__(self, table, target, value, callback):
180         ActionBase.__init__(self, table, target, callback)
181         self.value = value
182         self.stored = []
183     
184     def storedValue(self, t, node):
185         self.outstanding -= 1
186         self.table.insertNode(node)
187         if self.finished:
188             return
189         self.stored.append(t)
190         if len(self.stored) >= const.STORE_REDUNDANCY:
191             self.finished=1
192             self.callback(self.stored)
193         else:
194             if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
195                 self.schedule()
196             
197     def storeFailed(self, t, node):
198         self.table.nodeFailed(node)
199         self.outstanding -= 1
200         if self.finished:
201             return
202         self.schedule()
203         
204     def schedule(self):
205         if self.finished:
206             return
207         num = const.CONCURRENT_REQS - self.outstanding
208         if num > const.STORE_REDUNDANCY:
209             num = const.STORE_REDUNDANCY
210         for i in range(num):
211             try:
212                 node = self.nodes.pop()
213             except IndexError:
214                 if self.outstanding == 0:
215                     self.finished = 1
216                     self.callback(self.stored)
217             else:
218                 if not node.id == self.table.node.id:
219                     self.outstanding += 1
220                     df = node.storeValue(self.target, self.value, self.table.node.senderDict())
221                     df.addCallback(self.storedValue, node=node)
222                     df.addErrback(self.storeFailed, node=node)
223                     
224     def goWithNodes(self, nodes):
225         self.nodes = nodes
226         self.nodes.sort(self.sort)
227         self.schedule()
228
229
230 class KeyExpirer:
231     def __init__(self, store):
232         self.store = store
233         reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
234     
235     def doExpire(self):
236         self.cut = "%0.6f" % (time() - const.KE_AGE)
237         self._expire()
238     
239     def _expire(self):
240         c = self.store.cursor()
241         s = "delete from kv where time < '%s';" % self.cut
242         c.execute(s)
243         reactor.callLater(const.KE_DELAY, self.doExpire)
244