]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - actions.py
fix typo
[quix0rs-apt-p2p.git] / actions.py
1 from time import time
2 from pickle import loads, dumps
3
4 from bsddb3 import db
5
6 from const import reactor
7
8 from hash import intify
9 from knode import KNode as Node
10 from ktable import KTable, K
11
12 # concurrent FIND_NODE/VALUE requests!
13 N = 3
14
15 class ActionBase:
16     """ base class for some long running asynchronous proccesses like finding nodes or values """
17     def __init__(self, table, target, callback):
18         self.table = table
19         self.target = target
20         self.int = intify(target)
21         self.found = {}
22         self.queried = {}
23         self.answered = {}
24         self.callback = callback
25         self.outstanding = 0
26         self.finished = 0
27         
28         def sort(a, b, int=self.int):
29             """ this function is for sorting nodes relative to the ID we are looking for """
30             x, y = int ^ a.int, int ^ b.int
31             if x > y:
32                 return 1
33             elif x < y:
34                 return -1
35             return 0
36         self.sort = sort
37     
38     def goWithNodes(self, t):
39         pass
40         
41         
42
43 FIND_NODE_TIMEOUT = 15
44
45 class FindNode(ActionBase):
46     """ find node action merits it's own class as it is a long running stateful process """
47     def handleGotNodes(self, args):
48         l, sender = args
49         if self.finished or self.answered.has_key(sender['id']):
50             # a day late and a dollar short
51             return
52         self.outstanding = self.outstanding - 1
53         self.answered[sender['id']] = 1
54         for node in l:
55             if not self.found.has_key(node['id']):
56                 n = Node(node['id'], node['host'], node['port'])
57                 self.found[n.id] = n
58                 self.table.insertNode(n)
59         self.schedule()
60                 
61     def schedule(self):
62         """
63             send messages to new peers, if necessary
64         """
65         if self.finished:
66             return
67         l = self.found.values()
68         l.sort(self.sort)
69
70         for node in l[:K]:
71             if node.id == self.target:
72                 self.finished=1
73                 return self.callback([node])
74             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
75                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
76                 df = node.findNode(self.target, self.table.node.senderDict())
77                 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
78                 self.outstanding = self.outstanding + 1
79                 self.queried[node.id] = 1
80             if self.outstanding >= N:
81                 break
82         assert(self.outstanding) >=0
83         if self.outstanding == 0:
84             ## all done!!
85             self.finished=1
86             reactor.callFromThread(self.callback, l[:K])
87         
88     def defaultGotNodes(self, t):
89         if self.finished:
90             return
91         self.outstanding = self.outstanding - 1
92         self.schedule()
93         
94         
95     def goWithNodes(self, nodes):
96         """
97             this starts the process, our argument is a transaction with t.extras being our list of nodes
98             it's a transaction since we got called from the dispatcher
99         """
100         for node in nodes:
101             if node.id == self.table.node.id:
102                 continue
103             self.found[node.id] = node
104             #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
105             df = node.findNode(self.target, self.table.node.senderDict())
106             df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
107             self.outstanding = self.outstanding + 1
108             self.queried[node.id] = 1
109         if self.outstanding == 0:
110             self.callback(nodes)
111
112
113 GET_VALUE_TIMEOUT = 15
114 class GetValue(FindNode):
115     """ get value task """
116     def handleGotNodes(self, args):
117         l, sender = args
118         if self.finished or self.answered.has_key(sender['id']):
119             # a day late and a dollar short
120             return
121         self.outstanding = self.outstanding - 1
122         self.answered[sender['id']] = 1
123         # go through nodes
124         # if we have any closer than what we already got, query them
125         if l.has_key('nodes'):
126             for node in l['nodes']:
127                 if not self.found.has_key(node['id']):
128                     n = Node(node['id'], node['host'], node['port'])
129                     self.found[n.id] = n
130                     self.table.insertNode(n)
131         elif l.has_key('values'):
132             def x(y, z=self.results):
133                 if not z.has_key(y):
134                     z[y] = 1
135                     return y
136             v = filter(None, map(x, l['values']))
137             if(len(v)):
138                 reactor.callFromThread(self.callback, v)
139         self.schedule()
140                 
141     ## get value
142     def schedule(self):
143         if self.finished:
144             return
145         l = self.found.values()
146         l.sort(self.sort)
147
148         for node in l[:K]:
149             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
150                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
151                 df = node.findValue(self.target, self.table.node.senderDict())
152                 df.addCallback(self.handleGotNodes)
153                 df.addErrback(self.defaultGotNodes)
154                 self.outstanding = self.outstanding + 1
155                 self.queried[node.id] = 1
156             if self.outstanding >= N:
157                 break
158         assert(self.outstanding) >=0
159         if self.outstanding == 0:
160             ## all done, didn't find it!!
161             self.finished=1
162             reactor.callFromThread(self.callback,[])
163     
164     ## get value
165     def goWithNodes(self, nodes):
166         self.results = {}
167         for node in nodes:
168             if node.id == self.table.node.id:
169                 continue
170             self.found[node.id] = node
171             #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
172             df = node.findValue(self.target, self.table.node.senderDict())
173             df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
174             self.outstanding = self.outstanding + 1
175             self.queried[node.id] = 1
176         if self.outstanding == 0:
177             reactor.callFromThread(self.callback, [])
178
179
180 KEINITIAL_DELAY = 60 # 1 minute
181 KE_DELAY = 60 # 1 minute
182 KE_AGE = 60 * 5
183 class KeyExpirer:
184     def __init__(self, store, itime, kw):
185         self.store = store
186         self.itime = itime
187         self.kw = kw
188         reactor.callLater(KEINITIAL_DELAY, self.doExpire)
189         
190     def doExpire(self):
191         self.cut = `time() - KE_AGE`
192         self._expire()
193         
194     def _expire(self):
195         ic = self.itime.cursor()
196         sc = self.store.cursor()
197         kc = self.kw.cursor()
198         irec = None
199         try:
200             irec = ic.set_range(self.cut)
201         except db.DBNotFoundError:
202             # everything is expired
203             f = ic.prev
204             irec = f()
205         else:
206             f = ic.next
207         i = 0
208         while irec:
209             it, h = irec
210             try:
211                 k, v, lt = loads(self.store[h])
212             except KeyError:
213                 ic.delete()
214             else:
215                 if lt < self.cut:
216                     try:
217                         kc.set_both(k, h)
218                     except db.DBNotFoundError:
219                         print "Database inconsistency!  No key->value entry when a store entry was found!"
220                     else:
221                         kc.delete()
222                     self.store.delete(h)
223                     ic.delete()
224                     i = i + 1
225             irec = f()
226             
227         reactor.callLater(KE_DELAY, self.doExpire)
228         if(i > 0):
229             print ">>>KE: done expiring %d" % i
230