]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - actions.py
changed from bsddb3 to pysqlite
[quix0rs-apt-p2p.git] / actions.py
1 from time import time
2 from bencode import bdecode as loads
3 from bencode import bencode as dumps
4
5 from bsddb3 import db
6
7 from const import reactor
8 import const
9
10 from hash import intify
11 from knode import KNode as Node
12 from ktable import KTable, K
13
14 class ActionBase:
15     """ base class for some long running asynchronous proccesses like finding nodes or values """
16     def __init__(self, table, target, callback):
17         self.table = table
18         self.target = target
19         self.int = intify(target)
20         self.found = {}
21         self.queried = {}
22         self.answered = {}
23         self.callback = callback
24         self.outstanding = 0
25         self.finished = 0
26         
27         def sort(a, b, int=self.int):
28             """ this function is for sorting nodes relative to the ID we are looking for """
29             x, y = int ^ a.int, int ^ b.int
30             if x > y:
31                 return 1
32             elif x < y:
33                 return -1
34             return 0
35         self.sort = sort
36     
37     def goWithNodes(self, t):
38         pass
39         
40         
41
42 FIND_NODE_TIMEOUT = 15
43
44 class FindNode(ActionBase):
45     """ find node action merits it's own class as it is a long running stateful process """
46     def handleGotNodes(self, args):
47         l, sender = args
48         sender = Node().initWithDict(sender)
49         self.table.table.insertNode(sender)
50         if self.finished or self.answered.has_key(sender.id):
51             # a day late and a dollar short
52             return
53         self.outstanding = self.outstanding - 1
54         self.answered[sender.id] = 1
55         for node in l:
56             n = Node().initWithDict(node)
57             if not self.found.has_key(n.id):
58                 self.found[n.id] = n
59         self.schedule()
60                 
61     def schedule(self):
62         """
63             send messages to new peers, if necessary
64         """
65         if self.finished:
66             return
67         l = self.found.values()
68         l.sort(self.sort)
69
70         for node in l[:K]:
71             if node.id == self.target:
72                 self.finished=1
73                 return self.callback([node])
74             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
75                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
76                 df = node.findNode(self.target, self.table.node.senderDict())
77                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
78                 self.outstanding = self.outstanding + 1
79                 self.queried[node.id] = 1
80             if self.outstanding >= const.CONCURRENT_REQS:
81                 break
82         assert(self.outstanding) >=0
83         if self.outstanding == 0:
84             ## all done!!
85             self.finished=1
86             reactor.callFromThread(self.callback, l[:K])
87         
88     def makeMsgFailed(self, node):
89         def defaultGotNodes(err, self=self, node=node):
90             self.table.table.nodeFailed(node)
91             self.outstanding = self.outstanding - 1
92             self.schedule()
93         return defaultGotNodes
94         
95     def goWithNodes(self, nodes):
96         """
97             this starts the process, our argument is a transaction with t.extras being our list of nodes
98             it's a transaction since we got called from the dispatcher
99         """
100         for node in nodes:
101             if node.id == self.table.node.id:
102                 continue
103             else:
104                 self.found[node.id] = node
105             
106         self.schedule()
107
108
109 GET_VALUE_TIMEOUT = 15
110 class GetValue(FindNode):
111     """ get value task """
112     def handleGotNodes(self, args):
113         l, sender = args
114         sender = Node().initWithDict(sender)
115         self.table.table.insertNode(sender)
116         if self.finished or self.answered.has_key(sender.id):
117             # a day late and a dollar short
118             return
119         self.outstanding = self.outstanding - 1
120         self.answered[sender.id] = 1
121         # go through nodes
122         # if we have any closer than what we already got, query them
123         if l.has_key('nodes'):
124             for node in l['nodes']:
125                 n = Node().initWithDict(node)
126                 if not self.found.has_key(n.id):
127                     self.found[n.id] = n
128         elif l.has_key('values'):
129             def x(y, z=self.results):
130                 y = y.decode('base64')
131                 if not z.has_key(y):
132                     z[y] = 1
133                     return y
134                 else:
135                     return None
136             v = filter(None, map(x, l['values']))
137             if(len(v)):
138                 reactor.callFromThread(self.callback, v)
139         self.schedule()
140                 
141     ## get value
142     def schedule(self):
143         if self.finished:
144             return
145         l = self.found.values()
146         l.sort(self.sort)
147
148         for node in l[:K]:
149             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
150                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
151                 df = node.findValue(self.target, self.table.node.senderDict())
152                 df.addCallback(self.handleGotNodes)
153                 df.addErrback(self.makeMsgFailed(node))
154                 self.outstanding = self.outstanding + 1
155                 self.queried[node.id] = 1
156             if self.outstanding >= const.CONCURRENT_REQS:
157                 break
158         assert(self.outstanding) >=0
159         if self.outstanding == 0:
160             ## all done, didn't find it!!
161             self.finished=1
162             reactor.callFromThread(self.callback,[])
163     
164     ## get value
165     def goWithNodes(self, nodes, found=None):
166         self.results = {}
167         if found:
168             for n in found:
169                 self.results[n] = 1
170         for node in nodes:
171             if node.id == self.table.node.id:
172                 continue
173             else:
174                 self.found[node.id] = node
175             
176         self.schedule()
177
178
179
180 class KeyExpirer:
181     def __init__(self, store):
182         self.store = store
183         reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
184         
185     def doExpire(self):
186         self.cut = `time() - const.KE_AGE`
187         self._expire()
188         
189     def _expire(self):
190         c = self.store.cursor()
191         s = "delete from kv where time < '%s';" % self.cut
192         c.execute(s)
193         self.store.commit()
194         reactor.callLater(const.KE_DELAY, self.doExpire)
195