]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - actions.py
77160d7669b3e5e1a2b0bd97ce26a29c47dbb38b
[quix0rs-apt-p2p.git] / actions.py
1 from time import time
2 from pickle import loads, dumps
3
4 from bsddb3 import db
5
6 from const import reactor
7
8 from hash import intify
9 from knode import KNode as Node
10 from ktable import KTable, K
11
12 # concurrent FIND_NODE/VALUE requests!
13 N = 3
14
15 class ActionBase:
16     """ base class for some long running asynchronous proccesses like finding nodes or values """
17     def __init__(self, table, target, callback):
18         self.table = table
19         self.target = target
20         self.int = intify(target)
21         self.found = {}
22         self.queried = {}
23         self.answered = {}
24         self.callback = callback
25         self.outstanding = 0
26         self.finished = 0
27         
28         def sort(a, b, int=self.int):
29             """ this function is for sorting nodes relative to the ID we are looking for """
30             x, y = int ^ a.int, int ^ b.int
31             if x > y:
32                 return 1
33             elif x < y:
34                 return -1
35             return 0
36         self.sort = sort
37     
38     def goWithNodes(self, t):
39         pass
40         
41         
42
43 FIND_NODE_TIMEOUT = 15
44
45 class FindNode(ActionBase):
46     """ find node action merits it's own class as it is a long running stateful process """
47     def handleGotNodes(self, args):
48         l, sender = args
49         sender = Node().initWithDict(sender)
50         if self.finished or self.answered.has_key(sender.id):
51             # a day late and a dollar short
52             return
53         self.outstanding = self.outstanding - 1
54         self.answered[sender.id] = 1
55         for node in l:
56             n = Node().initWithDict(node)
57             if not self.found.has_key(n.id):
58                 self.found[n.id] = n
59                 self.table.insertNode(n)
60         self.schedule()
61                 
62     def schedule(self):
63         """
64             send messages to new peers, if necessary
65         """
66         if self.finished:
67             return
68         l = self.found.values()
69         l.sort(self.sort)
70
71         for node in l[:K]:
72             if node.id == self.target:
73                 self.finished=1
74                 return self.callback([node])
75             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
76                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
77                 df = node.findNode(self.target, self.table.node.senderDict())
78                 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
79                 self.outstanding = self.outstanding + 1
80                 self.queried[node.id] = 1
81             if self.outstanding >= N:
82                 break
83         assert(self.outstanding) >=0
84         if self.outstanding == 0:
85             ## all done!!
86             self.finished=1
87             reactor.callFromThread(self.callback, l[:K])
88         
89     def defaultGotNodes(self, t):
90         if self.finished:
91             return
92         self.outstanding = self.outstanding - 1
93         self.schedule()
94         
95         
96     def goWithNodes(self, nodes):
97         """
98             this starts the process, our argument is a transaction with t.extras being our list of nodes
99             it's a transaction since we got called from the dispatcher
100         """
101         for node in nodes:
102             if node.id == self.table.node.id:
103                 continue
104             self.found[node.id] = node
105             #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
106             df = node.findNode(self.target, self.table.node.senderDict())
107             df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
108             self.outstanding = self.outstanding + 1
109             self.queried[node.id] = 1
110         if self.outstanding == 0:
111             self.callback(nodes)
112
113
114 GET_VALUE_TIMEOUT = 15
115 class GetValue(FindNode):
116     """ get value task """
117     def handleGotNodes(self, args):
118         l, sender = args
119         sender = Node().initWithDict(sender)
120         if self.finished or self.answered.has_key(sender.id):
121             # a day late and a dollar short
122             return
123         self.outstanding = self.outstanding - 1
124         self.answered[sender.id] = 1
125         # go through nodes
126         # if we have any closer than what we already got, query them
127         if l.has_key('nodes'):
128             for node in l['nodes']:
129                 n = Node().initWithDict(node)
130                 if not self.found.has_key(n.id):
131                     self.found[n.id] = n
132                     self.table.insertNode(n)
133         elif l.has_key('values'):
134             def x(y, z=self.results):
135                 y = y.data
136                 if not z.has_key(y):
137                     z[y] = 1
138                     return y
139             v = filter(None, map(x, l['values']))
140             if(len(v)):
141                 reactor.callFromThread(self.callback, v)
142         self.schedule()
143                 
144     ## get value
145     def schedule(self):
146         if self.finished:
147             return
148         l = self.found.values()
149         l.sort(self.sort)
150
151         for node in l[:K]:
152             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
153                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
154                 df = node.findValue(self.target, self.table.node.senderDict())
155                 df.addCallback(self.handleGotNodes)
156                 df.addErrback(self.defaultGotNodes)
157                 self.outstanding = self.outstanding + 1
158                 self.queried[node.id] = 1
159             if self.outstanding >= N:
160                 break
161         assert(self.outstanding) >=0
162         if self.outstanding == 0:
163             ## all done, didn't find it!!
164             self.finished=1
165             reactor.callFromThread(self.callback,[])
166     
167     ## get value
168     def goWithNodes(self, nodes):
169         self.results = {}
170         for node in nodes:
171             if node.id == self.table.node.id:
172                 continue
173             self.found[node.id] = node
174             #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
175             df = node.findValue(self.target, self.table.node.senderDict())
176             df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
177             self.outstanding = self.outstanding + 1
178             self.queried[node.id] = 1
179         if self.outstanding == 0:
180             reactor.callFromThread(self.callback, [])
181
182
183 KEINITIAL_DELAY = 60 * 60 * 24 # 24 hours
184 KE_DELAY = 60 * 60 # 1 hour
185 KE_AGE = KEINITIAL_DELAY
186
187 class KeyExpirer:
188     def __init__(self, store, itime, kw):
189         self.store = store
190         self.itime = itime
191         self.kw = kw
192         reactor.callLater(KEINITIAL_DELAY, self.doExpire)
193         
194     def doExpire(self):
195         self.cut = `time() - KE_AGE`
196         self._expire()
197         
198     def _expire(self):
199         ic = self.itime.cursor()
200         sc = self.store.cursor()
201         kc = self.kw.cursor()
202         irec = None
203         try:
204             irec = ic.set_range(self.cut)
205         except db.DBNotFoundError:
206             # everything is expired
207             f = ic.prev
208             irec = f()
209         else:
210             f = ic.next
211         i = 0
212         while irec:
213             it, h = irec
214             try:
215                 k, v, lt = loads(self.store[h])
216             except KeyError:
217                 ic.delete()
218             else:
219                 if lt < self.cut:
220                     try:
221                         kc.set_both(k, h)
222                     except db.DBNotFoundError:
223                         print "Database inconsistency!  No key->value entry when a store entry was found!"
224                     else:
225                         kc.delete()
226                     self.store.delete(h)
227                     ic.delete()
228                     i = i + 1
229             irec = f()
230             
231         reactor.callLater(KE_DELAY, self.doExpire)
232         if(i > 0):
233             print ">>>KE: done expiring %d" % i
234