]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - actions.py
add some default bogus values in __init__
[quix0rs-apt-p2p.git] / actions.py
1 from time import time
2 from pickle import loads, dumps
3
4 from bsddb3 import db
5
6 from const import reactor
7 import const
8
9 from hash import intify
10 from knode import KNode as Node
11 from ktable import KTable, K
12
13 # concurrent FIND_NODE/VALUE requests!
14 N = 3
15
16 class ActionBase:
17     """ base class for some long running asynchronous proccesses like finding nodes or values """
18     def __init__(self, table, target, callback):
19         self.table = table
20         self.target = target
21         self.int = intify(target)
22         self.found = {}
23         self.queried = {}
24         self.answered = {}
25         self.callback = callback
26         self.outstanding = 0
27         self.finished = 0
28         
29         def sort(a, b, int=self.int):
30             """ this function is for sorting nodes relative to the ID we are looking for """
31             x, y = int ^ a.int, int ^ b.int
32             if x > y:
33                 return 1
34             elif x < y:
35                 return -1
36             return 0
37         self.sort = sort
38     
39     def goWithNodes(self, t):
40         pass
41         
42         
43
44 FIND_NODE_TIMEOUT = 15
45
46 class FindNode(ActionBase):
47     """ find node action merits it's own class as it is a long running stateful process """
48     def handleGotNodes(self, args):
49         l, sender = args
50         sender = Node().initWithDict(sender)
51         self.table.table.insertNode(sender)
52         if self.finished or self.answered.has_key(sender.id):
53             # a day late and a dollar short
54             return
55         self.outstanding = self.outstanding - 1
56         self.answered[sender.id] = 1
57         for node in l:
58             n = Node().initWithDict(node)
59             if not self.found.has_key(n.id):
60                 self.found[n.id] = n
61                 self.table.insertNode(n)
62         self.schedule()
63                 
64     def schedule(self):
65         """
66             send messages to new peers, if necessary
67         """
68         if self.finished:
69             return
70         l = self.found.values()
71         l.sort(self.sort)
72
73         for node in l[:K]:
74             if node.id == self.target:
75                 self.finished=1
76                 return self.callback([node])
77             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
78                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
79                 df = node.findNode(self.target, self.table.node.senderDict())
80                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
81                 self.outstanding = self.outstanding + 1
82                 self.queried[node.id] = 1
83             if self.outstanding >= N:
84                 break
85         assert(self.outstanding) >=0
86         if self.outstanding == 0:
87             ## all done!!
88             self.finished=1
89             reactor.callFromThread(self.callback, l[:K])
90         
91     def makeMsgFailed(self, node):
92         def defaultGotNodes(err, self=self, node=node):
93             self.table.table.nodeFailed(node)
94             if self.finished:
95                 return
96             self.outstanding = self.outstanding - 1
97             self.schedule()
98         return defaultGotNodes
99         
100     def goWithNodes(self, nodes):
101         """
102             this starts the process, our argument is a transaction with t.extras being our list of nodes
103             it's a transaction since we got called from the dispatcher
104         """
105         for node in nodes:
106             if node.id == self.table.node.id:
107                 continue
108             self.found[node.id] = node
109             #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
110             df = node.findNode(self.target, self.table.node.senderDict())
111             df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
112             self.outstanding = self.outstanding + 1
113             self.queried[node.id] = 1
114         if self.outstanding == 0:
115             self.callback(nodes)
116
117
118 GET_VALUE_TIMEOUT = 15
119 class GetValue(FindNode):
120     """ get value task """
121     def handleGotNodes(self, args):
122         l, sender = args
123         sender = Node().initWithDict(sender)
124         self.table.table.insertNode(sender)
125         if self.finished or self.answered.has_key(sender.id):
126             # a day late and a dollar short
127             return
128         self.outstanding = self.outstanding - 1
129         self.answered[sender.id] = 1
130         # go through nodes
131         # if we have any closer than what we already got, query them
132         if l.has_key('nodes'):
133             for node in l['nodes']:
134                 n = Node().initWithDict(node)
135                 if not self.found.has_key(n.id):
136                     self.found[n.id] = n
137                     self.table.insertNode(n)
138         elif l.has_key('values'):
139             def x(y, z=self.results):
140                 y = y.data
141                 if not z.has_key(y):
142                     z[y] = 1
143                     return y
144                 else:
145                     return None
146             v = filter(None, map(x, l['values']))
147             if(len(v)):
148                 reactor.callFromThread(self.callback, v)
149         self.schedule()
150                 
151     ## get value
152     def schedule(self):
153         if self.finished:
154             return
155         l = self.found.values()
156         l.sort(self.sort)
157
158         for node in l[:K]:
159             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
160                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
161                 df = node.findValue(self.target, self.table.node.senderDict())
162                 df.addCallback(self.handleGotNodes)
163                 df.addErrback(self.makeMsgFailed(node))
164                 self.outstanding = self.outstanding + 1
165                 self.queried[node.id] = 1
166             if self.outstanding >= N:
167                 break
168         assert(self.outstanding) >=0
169         if self.outstanding == 0:
170             ## all done, didn't find it!!
171             self.finished=1
172             reactor.callFromThread(self.callback,[])
173     
174     ## get value
175     def goWithNodes(self, nodes, found=None):
176         self.results = {}
177         if found:
178             for n in found:
179                 self.results[n] = 1
180         for node in nodes:
181             if node.id == self.table.node.id:
182                 continue
183             self.found[node.id] = node
184             #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
185             df = node.findValue(self.target, self.table.node.senderDict())
186             df.addCallback(self.handleGotNodes)
187             df.addErrback(self.makeMsgFailed(node))
188             self.outstanding = self.outstanding + 1
189             self.queried[node.id] = 1
190         if self.outstanding == 0:
191             reactor.callFromThread(self.callback, [])
192
193
194
195 class KeyExpirer:
196     def __init__(self, store, itime, kw):
197         self.store = store
198         self.itime = itime
199         self.kw = kw
200         reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
201         
202     def doExpire(self):
203         self.cut = `time() - const.KE_AGE`
204         self._expire()
205         
206     def _expire(self):
207         ic = self.itime.cursor()
208         sc = self.store.cursor()
209         kc = self.kw.cursor()
210         irec = None
211         try:
212             irec = ic.set_range(self.cut)
213         except db.DBNotFoundError:
214             # everything is expired
215             f = ic.prev
216             irec = f()
217         else:
218             f = ic.next
219         i = 0
220         while irec:
221             it, h = irec
222             try:
223                 k, v, lt = loads(self.store[h])
224             except KeyError:
225                 ic.delete()
226             else:
227                 if lt < self.cut:
228                     try:
229                         kc.set_both(k, h)
230                     except db.DBNotFoundError:
231                         print "Database inconsistency!  No key->value entry when a store entry was found!"
232                     else:
233                         kc.delete()
234                     self.store.delete(h)
235                     ic.delete()
236                     i = i + 1
237                 else:
238                     break
239             irec = f()
240             
241         reactor.callLater(const.KE_DELAY, self.doExpire)
242         if(i > 0):
243             print ">>>KE: done expiring %d" % i
244