dc743de2e7658bcb2e598bc3ccb503964b702c6c
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from time import time
5
6 from twisted.internet import reactor
7
8 from khash import intify
9
10 class ActionBase:
11     """ base class for some long running asynchronous proccesses like finding nodes or values """
12     def __init__(self, table, target, callback, config):
13         self.table = table
14         self.target = target
15         self.config = config
16         self.num = intify(target)
17         self.found = {}
18         self.queried = {}
19         self.answered = {}
20         self.callback = callback
21         self.outstanding = 0
22         self.finished = 0
23     
24         def sort(a, b, num=self.num):
25             """ this function is for sorting nodes relative to the ID we are looking for """
26             x, y = num ^ a.num, num ^ b.num
27             if x > y:
28                 return 1
29             elif x < y:
30                 return -1
31             return 0
32         self.sort = sort
33         
34     def goWithNodes(self, t):
35         pass
36     
37     
38
39 FIND_NODE_TIMEOUT = 15
40
41 class FindNode(ActionBase):
42     """ find node action merits it's own class as it is a long running stateful process """
43     def handleGotNodes(self, dict):
44         _krpc_sender = dict['_krpc_sender']
45         dict = dict['rsp']
46         l = dict["nodes"]
47         sender = {'id' : dict["id"]}
48         sender['port'] = _krpc_sender[1]        
49         sender['host'] = _krpc_sender[0]        
50         sender = self.table.Node().initWithDict(sender)
51         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
52         self.table.table.insertNode(sender)
53         if self.finished or self.answered.has_key(sender.id):
54             # a day late and a dollar short
55             return
56         self.outstanding = self.outstanding - 1
57         self.answered[sender.id] = 1
58         for node in l:
59             n = self.table.Node().initWithDict(node)
60             n.conn = self.table.udp.connectionForAddr((n.host, n.port))
61             if not self.found.has_key(n.id):
62                 self.found[n.id] = n
63         self.schedule()
64         
65     def schedule(self):
66         """
67             send messages to new peers, if necessary
68         """
69         if self.finished:
70             return
71         l = self.found.values()
72         l.sort(self.sort)
73         for node in l[:self.config['K']]:
74             if node.id == self.target:
75                 self.finished=1
76                 return self.callback([node])
77             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
78                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
79                 df = node.findNode(self.target, self.table.node.id)
80                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
81                 self.outstanding = self.outstanding + 1
82                 self.queried[node.id] = 1
83             if self.outstanding >= self.config['CONCURRENT_REQS']:
84                 break
85         assert(self.outstanding) >=0
86         if self.outstanding == 0:
87             ## all done!!
88             self.finished=1
89             reactor.callLater(0, self.callback, l[:self.config['K']])
90     
91     def makeMsgFailed(self, node):
92         def defaultGotNodes(err, self=self, node=node):
93             print ">>> find failed %s/%s" % (node.host, node.port), err
94             self.table.table.nodeFailed(node)
95             self.outstanding = self.outstanding - 1
96             self.schedule()
97         return defaultGotNodes
98     
99     def goWithNodes(self, nodes):
100         """
101             this starts the process, our argument is a transaction with t.extras being our list of nodes
102             it's a transaction since we got called from the dispatcher
103         """
104         for node in nodes:
105             if node.id == self.table.node.id:
106                 continue
107             else:
108                 self.found[node.id] = node
109         
110         self.schedule()
111     
112
113 get_value_timeout = 15
114 class GetValue(FindNode):
115     def __init__(self, table, target, callback, config, find="findValue"):
116         FindNode.__init__(self, table, target, callback, config)
117         self.findValue = find
118             
119     """ get value task """
120     def handleGotNodes(self, dict):
121         _krpc_sender = dict['_krpc_sender']
122         dict = dict['rsp']
123         sender = {'id' : dict["id"]}
124         sender['port'] = _krpc_sender[1]
125         sender['host'] = _krpc_sender[0]                
126         sender = self.table.Node().initWithDict(sender)
127         sender.conn = self.table.udp.connectionForAddr((sender.host, sender.port))
128         self.table.table.insertNode(sender)
129         if self.finished or self.answered.has_key(sender.id):
130             # a day late and a dollar short
131             return
132         self.outstanding = self.outstanding - 1
133         self.answered[sender.id] = 1
134         # go through nodes
135         # if we have any closer than what we already got, query them
136         if dict.has_key('nodes'):
137             for node in dict['nodes']:
138                 n = self.table.Node().initWithDict(node)
139                 n.conn = self.table.udp.connectionForAddr((n.host, n.port))
140                 if not self.found.has_key(n.id):
141                     self.found[n.id] = n
142         elif dict.has_key('values'):
143             def x(y, z=self.results):
144                 if not z.has_key(y):
145                     z[y] = 1
146                     return y
147                 else:
148                     return None
149             z = len(dict['values'])
150             v = filter(None, map(x, dict['values']))
151             if(len(v)):
152                 reactor.callLater(0, self.callback, v)
153         self.schedule()
154         
155     ## get value
156     def schedule(self):
157         if self.finished:
158             return
159         l = self.found.values()
160         l.sort(self.sort)
161         
162         for node in l[:self.config['K']]:
163             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
164                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
165                 try:
166                     f = getattr(node, self.findValue)
167                 except AttributeError:
168                     print ">>> findValue %s doesn't have a %s method!" % (node, self.findValue)
169                 else:
170                     df = f(self.target, self.table.node.id)
171                     df.addCallback(self.handleGotNodes)
172                     df.addErrback(self.makeMsgFailed(node))
173                     self.outstanding = self.outstanding + 1
174                     self.queried[node.id] = 1
175             if self.outstanding >= self.config['CONCURRENT_REQS']:
176                 break
177         assert(self.outstanding) >=0
178         if self.outstanding == 0:
179             ## all done, didn't find it!!
180             self.finished=1
181             reactor.callLater(0, self.callback,[])
182
183     ## get value
184     def goWithNodes(self, nodes, found=None):
185         self.results = {}
186         if found:
187             for n in found:
188                 self.results[n] = 1
189         for node in nodes:
190             if node.id == self.table.node.id:
191                 continue
192             else:
193                 self.found[node.id] = node
194             
195         self.schedule()
196
197
198 class StoreValue(ActionBase):
199     def __init__(self, table, target, value, callback, config, store="storeValue"):
200         ActionBase.__init__(self, table, target, callback, config)
201         self.value = value
202         self.stored = []
203         self.store = store
204         
205     def storedValue(self, t, node):
206         self.outstanding -= 1
207         self.table.insertNode(node)
208         if self.finished:
209             return
210         self.stored.append(t)
211         if len(self.stored) >= self.config['STORE_REDUNDANCY']:
212             self.finished=1
213             self.callback(self.stored)
214         else:
215             if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
216                 self.schedule()
217         return t
218     
219     def storeFailed(self, t, node):
220         print ">>> store failed %s/%s" % (node.host, node.port)
221         self.table.nodeFailed(node)
222         self.outstanding -= 1
223         if self.finished:
224             return t
225         self.schedule()
226         return t
227     
228     def schedule(self):
229         if self.finished:
230             return
231         num = self.config['CONCURRENT_REQS'] - self.outstanding
232         if num > self.config['STORE_REDUNDANCY']:
233             num = self.config['STORE_REDUNDANCY']
234         for i in range(num):
235             try:
236                 node = self.nodes.pop()
237             except IndexError:
238                 if self.outstanding == 0:
239                     self.finished = 1
240                     self.callback(self.stored)
241             else:
242                 if not node.id == self.table.node.id:
243                     self.outstanding += 1
244                     try:
245                         f = getattr(node, self.store)
246                     except AttributeError:
247                         print ">>> %s doesn't have a %s method!" % (node, self.store)
248                     else:
249                         df = f(self.target, self.value, self.table.node.id)
250                         df.addCallback(self.storedValue, node=node)
251                         df.addErrback(self.storeFailed, node=node)
252                     
253     def goWithNodes(self, nodes):
254         self.nodes = nodes
255         self.nodes.sort(self.sort)
256         self.schedule()
257
258
259 class KeyExpirer:
260     def __init__(self, store, config):
261         self.store = store
262         self.config = config
263         reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
264     
265     def doExpire(self):
266         self.cut = "%0.6f" % (time() - self.config['KE_AGE'])
267         self._expire()
268     
269     def _expire(self):
270         c = self.store.cursor()
271         s = "delete from kv where time < '%s';" % self.cut
272         c.execute(s)
273         reactor.callLater(self.config['KE_DELAY'], self.doExpire)