fixed borked tab/space problems, damn ProjectBuilder doesn't come with reasonable...
[quix0rs-apt-p2p.git] / actions.py
1 from time import time
2
3 from const import reactor
4 import const
5
6 from hash import intify
7 from knode import KNode as Node
8 from ktable import KTable, K
9
10 class ActionBase:
11     """ base class for some long running asynchronous proccesses like finding nodes or values """
12     def __init__(self, table, target, callback):
13         self.table = table
14         self.target = target
15         self.num = intify(target)
16         self.found = {}
17         self.queried = {}
18         self.answered = {}
19         self.callback = callback
20         self.outstanding = 0
21         self.finished = 0
22         
23         def sort(a, b, num=self.num):
24             """ this function is for sorting nodes relative to the ID we are looking for """
25             x, y = num ^ a.num, num ^ b.num
26             if x > y:
27                 return 1
28             elif x < y:
29                 return -1
30             return 0
31         self.sort = sort
32     
33     def goWithNodes(self, t):
34         pass
35         
36         
37
38 FIND_NODE_TIMEOUT = 15
39
40 class FindNode(ActionBase):
41     """ find node action merits it's own class as it is a long running stateful process """
42     def handleGotNodes(self, args):
43         l, sender = args
44         sender = Node().initWithDict(sender)
45         self.table.table.insertNode(sender)
46         if self.finished or self.answered.has_key(sender.id):
47             # a day late and a dollar short
48             return
49         self.outstanding = self.outstanding - 1
50         self.answered[sender.id] = 1
51         for node in l:
52             n = Node().initWithDict(node)
53             if not self.found.has_key(n.id):
54                 self.found[n.id] = n
55         self.schedule()
56                 
57     def schedule(self):
58         """
59             send messages to new peers, if necessary
60         """
61         if self.finished:
62             return
63         l = self.found.values()
64         l.sort(self.sort)
65
66         for node in l[:K]:
67             if node.id == self.target:
68                 self.finished=1
69                 return self.callback([node])
70             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
71                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
72                 df = node.findNode(self.target, self.table.node.senderDict())
73                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
74                 self.outstanding = self.outstanding + 1
75                 self.queried[node.id] = 1
76             if self.outstanding >= const.CONCURRENT_REQS:
77                 break
78         assert(self.outstanding) >=0
79         if self.outstanding == 0:
80             ## all done!!
81             self.finished=1
82             reactor.callFromThread(self.callback, l[:K])
83         
84     def makeMsgFailed(self, node):
85         def defaultGotNodes(err, self=self, node=node):
86             self.table.table.nodeFailed(node)
87             self.outstanding = self.outstanding - 1
88             self.schedule()
89         return defaultGotNodes
90         
91     def goWithNodes(self, nodes):
92         """
93             this starts the process, our argument is a transaction with t.extras being our list of nodes
94             it's a transaction since we got called from the dispatcher
95         """
96         for node in nodes:
97             if node.id == self.table.node.id:
98                 continue
99             else:
100                 self.found[node.id] = node
101             
102         self.schedule()
103
104
105 GET_VALUE_TIMEOUT = 15
106 class GetValue(FindNode):
107     """ get value task """
108     def handleGotNodes(self, args):
109         l, sender = args
110         sender = Node().initWithDict(sender)
111         self.table.table.insertNode(sender)
112         if self.finished or self.answered.has_key(sender.id):
113             # a day late and a dollar short
114             return
115         self.outstanding = self.outstanding - 1
116         self.answered[sender.id] = 1
117         # go through nodes
118         # if we have any closer than what we already got, query them
119         if l.has_key('nodes'):
120             for node in l['nodes']:
121                 n = Node().initWithDict(node)
122                 if not self.found.has_key(n.id):
123                     self.found[n.id] = n
124         elif l.has_key('values'):
125             def x(y, z=self.results):
126                 y = y.decode('base64')
127                 if not z.has_key(y):
128                     z[y] = 1
129                     return y
130                 else:
131                     return None
132             v = filter(None, map(x, l['values']))
133             if(len(v)):
134                 reactor.callFromThread(self.callback, v)
135         self.schedule()
136                 
137     ## get value
138     def schedule(self):
139         if self.finished:
140             return
141         l = self.found.values()
142         l.sort(self.sort)
143
144         for node in l[:K]:
145             if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
146                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
147                 df = node.findValue(self.target, self.table.node.senderDict())
148                 df.addCallback(self.handleGotNodes)
149                 df.addErrback(self.makeMsgFailed(node))
150                 self.outstanding = self.outstanding + 1
151                 self.queried[node.id] = 1
152             if self.outstanding >= const.CONCURRENT_REQS:
153                 break
154         assert(self.outstanding) >=0
155         if self.outstanding == 0:
156             ## all done, didn't find it!!
157             self.finished=1
158             reactor.callFromThread(self.callback,[])
159
160     ## get value
161     def goWithNodes(self, nodes, found=None):
162         self.results = {}
163         if found:
164             for n in found:
165                 self.results[n] = 1
166         for node in nodes:
167             if node.id == self.table.node.id:
168                 continue
169             else:
170                 self.found[node.id] = node
171             
172         self.schedule()
173
174
175
176 class KeyExpirer:
177     def __init__(self, store):
178         self.store = store
179         reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
180         
181     def doExpire(self):
182         self.cut = "%0.6f" % (time() - const.KE_AGE)
183         self._expire()
184         
185     def _expire(self):
186         c = self.store.cursor()
187         s = "delete from kv where time < '%s';" % self.cut
188         c.execute(s)
189         reactor.callLater(const.KE_DELAY, self.doExpire)
190