]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - actions.py
return K nodes and not K-1
[quix0rs-apt-p2p.git] / actions.py
1 from time import time
2 from pickle import loads, dumps
3
4 from bsddb3 import db
5
6 from const import reactor
7 import const
8
9 from hash import intify
10 from knode import KNode as Node
11 from ktable import KTable, K
12
13 class ActionBase:
14     """ base class for some long running asynchronous proccesses like finding nodes or values """
15     def __init__(self, table, target, callback):
16         self.table = table
17         self.target = target
18         self.int = intify(target)
19         self.found = {}
20         self.queried = {}
21         self.answered = {}
22         self.callback = callback
23         self.outstanding = 0
24         self.finished = 0
25         
26         def sort(a, b, int=self.int):
27             """ this function is for sorting nodes relative to the ID we are looking for """
28             x, y = int ^ a.int, int ^ b.int
29             if x > y:
30                 return 1
31             elif x < y:
32                 return -1
33             return 0
34         self.sort = sort
35     
36     def goWithNodes(self, t):
37         pass
38         
39         
40
41 FIND_NODE_TIMEOUT = 15
42
43 class FindNode(ActionBase):
44     """ find node action merits it's own class as it is a long running stateful process """
45     def handleGotNodes(self, args):
46         args, conn = args
47         l, sender = args
48         sender['host'] = conn['host']
49         sender = Node().initWithDict(sender)
50         self.table.table.insertNode(sender)
51         if self.finished or self.answered.has_key(sender.id):
52             # a day late and a dollar short
53             return
54         self.outstanding = self.outstanding - 1
55         self.answered[sender.id] = 1
56         for node in l:
57             n = Node().initWithDict(node)
58             if not self.found.has_key(n.id):
59                 self.found[n.id] = n
60         self.schedule()
61                 
62     def schedule(self):
63         """
64             send messages to new peers, if necessary
65         """
66         if self.finished:
67             return
68         l = self.found.values()
69         l.sort(self.sort)
70
71         for node in l[:K]:
72             if node.id == self.target:
73                 self.finished=1
74                 return self.callback([node])
75             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
76                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
77                 df = node.findNode(self.target, self.table.node.senderDict())
78                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
79                 self.outstanding = self.outstanding + 1
80                 self.queried[node.id] = 1
81             if self.outstanding >= const.CONCURRENT_REQS:
82                 break
83         assert(self.outstanding) >=0
84         if self.outstanding == 0:
85             ## all done!!
86             self.finished=1
87             reactor.callFromThread(self.callback, l[:K])
88         
89     def makeMsgFailed(self, node):
90         def defaultGotNodes(err, self=self, node=node):
91             self.table.table.nodeFailed(node)
92             if self.finished:
93                 return
94             self.outstanding = self.outstanding - 1
95             self.schedule()
96         return defaultGotNodes
97         
98     def goWithNodes(self, nodes):
99         """
100             this starts the process, our argument is a transaction with t.extras being our list of nodes
101             it's a transaction since we got called from the dispatcher
102         """
103         for node in nodes:
104             if node.id == self.table.node.id:
105                 continue
106             self.found[node.id] = node
107             #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
108             df = node.findNode(self.target, self.table.node.senderDict())
109             df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
110             self.outstanding = self.outstanding + 1
111             self.queried[node.id] = 1
112             if self.outstanding >= const.CONCURRENT_REQS:
113                 break
114
115         if self.outstanding == 0:
116             self.callback(nodes)
117
118
119 GET_VALUE_TIMEOUT = 15
120 class GetValue(FindNode):
121     """ get value task """
122     def handleGotNodes(self, args):
123         args, conn = args
124         l, sender = args
125         sender['host'] = conn['host']
126         sender = Node().initWithDict(sender)
127         self.table.table.insertNode(sender)
128         if self.finished or self.answered.has_key(sender.id):
129             # a day late and a dollar short
130             return
131         self.outstanding = self.outstanding - 1
132         self.answered[sender.id] = 1
133         # go through nodes
134         # if we have any closer than what we already got, query them
135         if l.has_key('nodes'):
136             for node in l['nodes']:
137                 n = Node().initWithDict(node)
138                 if not self.found.has_key(n.id):
139                     self.found[n.id] = n
140         elif l.has_key('values'):
141             def x(y, z=self.results):
142                 y = y.data
143                 if not z.has_key(y):
144                     z[y] = 1
145                     return y
146                 else:
147                     return None
148             v = filter(None, map(x, l['values']))
149             if(len(v)):
150                 reactor.callFromThread(self.callback, v)
151         self.schedule()
152                 
153     ## get value
154     def schedule(self):
155         if self.finished:
156             return
157         l = self.found.values()
158         l.sort(self.sort)
159
160         for node in l[:K]:
161             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
162                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
163                 df = node.findValue(self.target, self.table.node.senderDict())
164                 df.addCallback(self.handleGotNodes)
165                 df.addErrback(self.makeMsgFailed(node))
166                 self.outstanding = self.outstanding + 1
167                 self.queried[node.id] = 1
168             if self.outstanding >= const.CONCURRENT_REQS:
169                 break
170         assert(self.outstanding) >=0
171         if self.outstanding == 0:
172             ## all done, didn't find it!!
173             self.finished=1
174             reactor.callFromThread(self.callback,[])
175     
176     ## get value
177     def goWithNodes(self, nodes, found=None):
178         self.results = {}
179         if found:
180             for n in found:
181                 self.results[n] = 1
182         for node in nodes:
183             if node.id == self.table.node.id:
184                 continue
185             self.found[node.id] = node
186             #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
187             df = node.findValue(self.target, self.table.node.senderDict())
188             df.addCallback(self.handleGotNodes)
189             df.addErrback(self.makeMsgFailed(node))
190             self.outstanding = self.outstanding + 1
191             self.queried[node.id] = 1
192             if self.outstanding >= const.CONCURRENT_REQS:
193                 break
194
195         if self.outstanding == 0:
196             reactor.callFromThread(self.callback, [])
197
198
199
200 class KeyExpirer:
201     def __init__(self, store, itime, kw):
202         self.store = store
203         self.itime = itime
204         self.kw = kw
205         reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
206         
207     def doExpire(self):
208         self.cut = `time() - const.KE_AGE`
209         self._expire()
210         
211     def _expire(self):
212         ic = self.itime.cursor()
213         sc = self.store.cursor()
214         kc = self.kw.cursor()
215         irec = None
216         try:
217             irec = ic.set_range(self.cut)
218         except db.DBNotFoundError:
219             # everything is expired
220             f = ic.prev
221             irec = f()
222         else:
223             f = ic.next
224         i = 0
225         while irec:
226             it, h = irec
227             try:
228                 k, v, lt = loads(self.store[h])
229             except KeyError:
230                 ic.delete()
231             else:
232                 if lt < self.cut:
233                     try:
234                         kc.set_both(k, h)
235                     except db.DBNotFoundError:
236                         print "Database inconsistency!  No key->value entry when a store entry was found!"
237                     else:
238                         kc.delete()
239                     self.store.delete(h)
240                     ic.delete()
241                     i = i + 1
242                 else:
243                     break
244             irec = f()
245             
246         reactor.callLater(const.KE_DELAY, self.doExpire)
247         if(i > 0):
248             print ">>>KE: done expiring %d" % i
249