]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - actions.py
removed client side connection host determination
[quix0rs-apt-p2p.git] / actions.py
1 from time import time
2 from pickle import loads, dumps
3
4 from bsddb3 import db
5
6 from const import reactor
7 import const
8
9 from hash import intify
10 from knode import KNode as Node
11 from ktable import KTable, K
12
13 class ActionBase:
14     """ base class for some long running asynchronous proccesses like finding nodes or values """
15     def __init__(self, table, target, callback):
16         self.table = table
17         self.target = target
18         self.int = intify(target)
19         self.found = {}
20         self.queried = {}
21         self.answered = {}
22         self.callback = callback
23         self.outstanding = 0
24         self.finished = 0
25         
26         def sort(a, b, int=self.int):
27             """ this function is for sorting nodes relative to the ID we are looking for """
28             x, y = int ^ a.int, int ^ b.int
29             if x > y:
30                 return 1
31             elif x < y:
32                 return -1
33             return 0
34         self.sort = sort
35     
36     def goWithNodes(self, t):
37         pass
38         
39         
40
41 FIND_NODE_TIMEOUT = 15
42
43 class FindNode(ActionBase):
44     """ find node action merits it's own class as it is a long running stateful process """
45     def handleGotNodes(self, args):
46         l, sender = args
47         sender = Node().initWithDict(sender)
48         self.table.table.insertNode(sender)
49         if self.finished or self.answered.has_key(sender.id):
50             # a day late and a dollar short
51             return
52         self.outstanding = self.outstanding - 1
53         self.answered[sender.id] = 1
54         for node in l:
55             n = Node().initWithDict(node)
56             if not self.found.has_key(n.id):
57                 self.found[n.id] = n
58         self.schedule()
59                 
60     def schedule(self):
61         """
62             send messages to new peers, if necessary
63         """
64         if self.finished:
65             return
66         l = self.found.values()
67         l.sort(self.sort)
68
69         for node in l[:K]:
70             if node.id == self.target:
71                 self.finished=1
72                 return self.callback([node])
73             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
74                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
75                 df = node.findNode(self.target, self.table.node.senderDict())
76                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
77                 self.outstanding = self.outstanding + 1
78                 self.queried[node.id] = 1
79             if self.outstanding >= const.CONCURRENT_REQS:
80                 break
81         assert(self.outstanding) >=0
82         if self.outstanding == 0:
83             ## all done!!
84             self.finished=1
85             reactor.callFromThread(self.callback, l[:K])
86         
87     def makeMsgFailed(self, node):
88         def defaultGotNodes(err, self=self, node=node):
89             self.table.table.nodeFailed(node)
90             self.outstanding = self.outstanding - 1
91             self.schedule()
92         return defaultGotNodes
93         
94     def goWithNodes(self, nodes):
95         """
96             this starts the process, our argument is a transaction with t.extras being our list of nodes
97             it's a transaction since we got called from the dispatcher
98         """
99         for node in nodes:
100             if node.id == self.table.node.id:
101                 continue
102             else:
103                 self.found[node.id] = node
104             
105         self.schedule()
106
107
108 GET_VALUE_TIMEOUT = 15
109 class GetValue(FindNode):
110     """ get value task """
111     def handleGotNodes(self, args):
112         l, sender = args
113         sender = Node().initWithDict(sender)
114         self.table.table.insertNode(sender)
115         if self.finished or self.answered.has_key(sender.id):
116             # a day late and a dollar short
117             return
118         self.outstanding = self.outstanding - 1
119         self.answered[sender.id] = 1
120         # go through nodes
121         # if we have any closer than what we already got, query them
122         if l.has_key('nodes'):
123             for node in l['nodes']:
124                 n = Node().initWithDict(node)
125                 if not self.found.has_key(n.id):
126                     self.found[n.id] = n
127         elif l.has_key('values'):
128             def x(y, z=self.results):
129                 y = y.data
130                 if not z.has_key(y):
131                     z[y] = 1
132                     return y
133                 else:
134                     return None
135             v = filter(None, map(x, l['values']))
136             if(len(v)):
137                 reactor.callFromThread(self.callback, v)
138         self.schedule()
139                 
140     ## get value
141     def schedule(self):
142         if self.finished:
143             return
144         l = self.found.values()
145         l.sort(self.sort)
146
147         for node in l[:K]:
148             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
149                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
150                 df = node.findValue(self.target, self.table.node.senderDict())
151                 df.addCallback(self.handleGotNodes)
152                 df.addErrback(self.makeMsgFailed(node))
153                 self.outstanding = self.outstanding + 1
154                 self.queried[node.id] = 1
155             if self.outstanding >= const.CONCURRENT_REQS:
156                 break
157         assert(self.outstanding) >=0
158         if self.outstanding == 0:
159             ## all done, didn't find it!!
160             self.finished=1
161             reactor.callFromThread(self.callback,[])
162     
163     ## get value
164     def goWithNodes(self, nodes, found=None):
165         self.results = {}
166         if found:
167             for n in found:
168                 self.results[n] = 1
169         for node in nodes:
170             if node.id == self.table.node.id:
171                 continue
172             else:
173                 self.found[node.id] = node
174             
175         self.schedule()
176
177
178
179 class KeyExpirer:
180     def __init__(self, store, itime, kw):
181         self.store = store
182         self.itime = itime
183         self.kw = kw
184         reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
185         
186     def doExpire(self):
187         self.cut = `time() - const.KE_AGE`
188         self._expire()
189         
190     def _expire(self):
191         ic = self.itime.cursor()
192         sc = self.store.cursor()
193         kc = self.kw.cursor()
194         irec = None
195         try:
196             irec = ic.set_range(self.cut)
197         except db.DBNotFoundError:
198             # everything is expired
199             f = ic.prev
200             irec = f()
201         else:
202             f = ic.next
203         i = 0
204         while irec:
205             it, h = irec
206             try:
207                 k, v, lt = loads(self.store[h])
208             except KeyError:
209                 ic.delete()
210             else:
211                 if lt < self.cut:
212                     try:
213                         kc.set_both(k, h)
214                     except db.DBNotFoundError:
215                         print "Database inconsistency!  No key->value entry when a store entry was found!"
216                     else:
217                         kc.delete()
218                     self.store.delete(h)
219                     ic.delete()
220                     i = i + 1
221                 else:
222                     break
223             irec = f()
224             
225         reactor.callLater(const.KE_DELAY, self.doExpire)
226         if(i > 0):
227             print ">>>KE: done expiring %d" % i
228