Remove the originated time from the DHT value storage.
[quix0rs-apt-p2p.git] / apt_dht_Khashmir / actions.py
1 ## Copyright 2002-2004 Andrew Loewenstern, All Rights Reserved
2 # see LICENSE.txt for license information
3
4 from twisted.internet import reactor
5 from twisted.python import log
6
7 from khash import intify
8
9 class ActionBase:
10     """ base class for some long running asynchronous proccesses like finding nodes or values """
11     def __init__(self, caller, target, callback, config):
12         self.caller = caller
13         self.target = target
14         self.config = config
15         self.num = intify(target)
16         self.found = {}
17         self.queried = {}
18         self.answered = {}
19         self.callback = callback
20         self.outstanding = 0
21         self.finished = 0
22     
23         def sort(a, b, num=self.num):
24             """ this function is for sorting nodes relative to the ID we are looking for """
25             x, y = num ^ a.num, num ^ b.num
26             if x > y:
27                 return 1
28             elif x < y:
29                 return -1
30             return 0
31         self.sort = sort
32         
33     def goWithNodes(self, t):
34         pass
35     
36     
37
38 FIND_NODE_TIMEOUT = 15
39
40 class FindNode(ActionBase):
41     """ find node action merits it's own class as it is a long running stateful process """
42     def handleGotNodes(self, dict):
43         _krpc_sender = dict['_krpc_sender']
44         dict = dict['rsp']
45         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
46         self.caller.insertNode(n)
47         l = dict["nodes"]
48         if self.finished or self.answered.has_key(dict["id"]):
49             # a day late and a dollar short
50             return
51         self.outstanding = self.outstanding - 1
52         self.answered[dict["id"]] = 1
53         for node in l:
54             n = self.caller.Node(node)
55             if not self.found.has_key(n.id):
56                 self.found[n.id] = n
57         self.schedule()
58         
59     def schedule(self):
60         """
61             send messages to new peers, if necessary
62         """
63         if self.finished:
64             return
65         l = self.found.values()
66         l.sort(self.sort)
67         for node in l[:self.config['K']]:
68             if node.id == self.target:
69                 self.finished=1
70                 return self.callback([node])
71             if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
72                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
73                 df = node.findNode(self.target, self.caller.node.id)
74                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
75                 self.outstanding = self.outstanding + 1
76                 self.queried[node.id] = 1
77             if self.outstanding >= self.config['CONCURRENT_REQS']:
78                 break
79         assert self.outstanding >=0
80         if self.outstanding == 0:
81             ## all done!!
82             self.finished=1
83             reactor.callLater(0, self.callback, l[:self.config['K']])
84     
85     def makeMsgFailed(self, node):
86         def defaultGotNodes(err, self=self, node=node):
87             log.msg("find failed (%s) %s/%s" % (self.config['PORT'], node.host, node.port))
88             log.err(err)
89             self.caller.table.nodeFailed(node)
90             self.outstanding = self.outstanding - 1
91             self.schedule()
92         return defaultGotNodes
93     
94     def goWithNodes(self, nodes):
95         """
96             this starts the process, our argument is a transaction with t.extras being our list of nodes
97             it's a transaction since we got called from the dispatcher
98         """
99         for node in nodes:
100             if node.id == self.caller.node.id:
101                 continue
102             else:
103                 self.found[node.id] = node
104         
105         self.schedule()
106     
107
108 get_value_timeout = 15
109 class GetValue(FindNode):
110     def __init__(self, caller, target, callback, config, find="findValue"):
111         FindNode.__init__(self, caller, target, callback, config)
112         self.findValue = find
113             
114     """ get value task """
115     def handleGotNodes(self, dict):
116         _krpc_sender = dict['_krpc_sender']
117         dict = dict['rsp']
118         n = self.caller.Node(dict["id"], _krpc_sender[0], _krpc_sender[1])
119         self.caller.insertNode(n)
120         if self.finished or self.answered.has_key(dict["id"]):
121             # a day late and a dollar short
122             return
123         self.outstanding = self.outstanding - 1
124         self.answered[dict["id"]] = 1
125         # go through nodes
126         # if we have any closer than what we already got, query them
127         if dict.has_key('nodes'):
128             for node in dict['nodes']:
129                 n = self.caller.Node(node)
130                 if not self.found.has_key(n.id):
131                     self.found[n.id] = n
132         elif dict.has_key('values'):
133             def x(y, z=self.results):
134                 if not z.has_key(y):
135                     z[y] = 1
136                     return y
137                 else:
138                     return None
139             z = len(dict['values'])
140             v = filter(None, map(x, dict['values']))
141             if(len(v)):
142                 reactor.callLater(0, self.callback, self.target, v)
143         self.schedule()
144         
145     ## get value
146     def schedule(self):
147         if self.finished:
148             return
149         l = self.found.values()
150         l.sort(self.sort)
151         
152         for node in l[:self.config['K']]:
153             if (not self.queried.has_key(node.id)) and node.id != self.caller.node.id:
154                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
155                 try:
156                     f = getattr(node, self.findValue)
157                 except AttributeError:
158                     log.msg("findValue %s doesn't have a %s method!" % (node, self.findValue))
159                 else:
160                     df = f(self.target, self.caller.node.id)
161                     df.addCallback(self.handleGotNodes)
162                     df.addErrback(self.makeMsgFailed(node))
163                     self.outstanding = self.outstanding + 1
164                     self.queried[node.id] = 1
165             if self.outstanding >= self.config['CONCURRENT_REQS']:
166                 break
167         assert self.outstanding >=0
168         if self.outstanding == 0:
169             ## all done, didn't find it!!
170             self.finished=1
171             reactor.callLater(0, self.callback, self.target, [])
172
173     ## get value
174     def goWithNodes(self, nodes, found=None):
175         self.results = {}
176         if found:
177             for n in found:
178                 self.results[n] = 1
179         for node in nodes:
180             if node.id == self.caller.node.id:
181                 continue
182             else:
183                 self.found[node.id] = node
184             
185         self.schedule()
186
187
188 class StoreValue(ActionBase):
189     def __init__(self, caller, target, value, callback, config, store="storeValue"):
190         ActionBase.__init__(self, caller, target, callback, config)
191         self.value = value
192         self.stored = []
193         self.store = store
194         
195     def storedValue(self, t, node):
196         self.outstanding -= 1
197         self.caller.insertNode(node)
198         if self.finished:
199             return
200         self.stored.append(t)
201         if len(self.stored) >= self.config['STORE_REDUNDANCY']:
202             self.finished=1
203             self.callback(self.target, self.value, self.stored)
204         else:
205             if not len(self.stored) + self.outstanding >= self.config['STORE_REDUNDANCY']:
206                 self.schedule()
207         return t
208     
209     def storeFailed(self, t, node):
210         log.msg("store failed %s/%s" % (node.host, node.port))
211         self.caller.nodeFailed(node)
212         self.outstanding -= 1
213         if self.finished:
214             return t
215         self.schedule()
216         return t
217     
218     def schedule(self):
219         if self.finished:
220             return
221         num = self.config['CONCURRENT_REQS'] - self.outstanding
222         if num > self.config['STORE_REDUNDANCY']:
223             num = self.config['STORE_REDUNDANCY']
224         for i in range(num):
225             try:
226                 node = self.nodes.pop()
227             except IndexError:
228                 if self.outstanding == 0:
229                     self.finished = 1
230                     self.callback(self.target, self.value, self.stored)
231             else:
232                 if not node.id == self.caller.node.id:
233                     self.outstanding += 1
234                     try:
235                         f = getattr(node, self.store)
236                     except AttributeError:
237                         log.msg("%s doesn't have a %s method!" % (node, self.store))
238                     else:
239                         df = f(self.target, self.value, self.caller.node.id)
240                         df.addCallback(self.storedValue, node=node)
241                         df.addErrback(self.storeFailed, node=node)
242                     
243     def goWithNodes(self, nodes):
244         self.nodes = nodes
245         self.nodes.sort(self.sort)
246         self.schedule()
247
248
249 class KeyExpirer:
250     def __init__(self, store, config):
251         self.store = store
252         self.config = config
253         self.next_expire = reactor.callLater(self.config['KEINITIAL_DELAY'], self.doExpire)
254     
255     def doExpire(self):
256         self.store.expireValues(self.config['KE_AGE'])
257         self.next_expire = reactor.callLater(self.config['KE_DELAY'], self.doExpire)
258         
259     def shutdown(self):
260         try:
261             self.next_expire.cancel()
262         except:
263             pass