5fc11b34ff9d57f3bc0bc02cb1b09f6a70d2a691
[quix0rs-apt-p2p.git] / actions.py
1 from time import time
2
3 from const import reactor
4 import const
5
6 from hash import intify
7 from knode import KNode as Node
8 from ktable import KTable, K
9
10 class ActionBase:
11         """ base class for some long running asynchronous proccesses like finding nodes or values """
12         def __init__(self, table, target, callback):
13                 self.table = table
14                 self.target = target
15                 self.num = intify(target)
16                 self.found = {}
17                 self.queried = {}
18                 self.answered = {}
19                 self.callback = callback
20                 self.outstanding = 0
21                 self.finished = 0
22         
23                 def sort(a, b, num=self.num):
24                         """ this function is for sorting nodes relative to the ID we are looking for """
25                         x, y = num ^ a.num, num ^ b.num
26                         if x > y:
27                                 return 1
28                         elif x < y:
29                                 return -1
30                         return 0
31                 self.sort = sort
32                 
33         def goWithNodes(self, t):
34                 pass
35         
36         
37
38 FIND_NODE_TIMEOUT = 15
39
40 class FindNode(ActionBase):
41         """ find node action merits it's own class as it is a long running stateful process """
42         def handleGotNodes(self, args):
43                 l, sender = args
44                 sender = Node().initWithDict(sender)
45                 self.table.table.insertNode(sender)
46                 if self.finished or self.answered.has_key(sender.id):
47                         # a day late and a dollar short
48                         return
49                 self.outstanding = self.outstanding - 1
50                 self.answered[sender.id] = 1
51                 for node in l:
52                         n = Node().initWithDict(node)
53                         if not self.found.has_key(n.id):
54                                 self.found[n.id] = n
55                 self.schedule()
56                 
57         def schedule(self):
58                 """
59                         send messages to new peers, if necessary
60                 """
61                 if self.finished:
62                         return
63                 l = self.found.values()
64                 l.sort(self.sort)
65         
66                 for node in l[:K]:
67                         if node.id == self.target:
68                                 self.finished=1
69                                 return self.callback([node])
70                         if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
71                                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
72                                 df = node.findNode(self.target, self.table.node.senderDict())
73                                 df.addCallbacks(self.handleGotNodes, self.makeMsgFailed(node))
74                                 self.outstanding = self.outstanding + 1
75                                 self.queried[node.id] = 1
76                         if self.outstanding >= const.CONCURRENT_REQS:
77                                 break
78                 assert(self.outstanding) >=0
79                 if self.outstanding == 0:
80                         ## all done!!
81                         self.finished=1
82                         reactor.callFromThread(self.callback, l[:K])
83         
84         def makeMsgFailed(self, node):
85                 def defaultGotNodes(err, self=self, node=node):
86                         self.table.table.nodeFailed(node)
87                         self.outstanding = self.outstanding - 1
88                         self.schedule()
89                 return defaultGotNodes
90         
91         def goWithNodes(self, nodes):
92                 """
93                         this starts the process, our argument is a transaction with t.extras being our list of nodes
94                         it's a transaction since we got called from the dispatcher
95                 """
96                 for node in nodes:
97                         if node.id == self.table.node.id:
98                                 continue
99                         else:
100                                 self.found[node.id] = node
101                 
102                 self.schedule()
103         
104
105 GET_VALUE_TIMEOUT = 15
106 class GetValue(FindNode):
107     """ get value task """
108     def handleGotNodes(self, args):
109                 l, sender = args
110                 sender = Node().initWithDict(sender)
111                 self.table.table.insertNode(sender)
112                 if self.finished or self.answered.has_key(sender.id):
113                         # a day late and a dollar short
114                         return
115                 self.outstanding = self.outstanding - 1
116                 self.answered[sender.id] = 1
117                 # go through nodes
118                 # if we have any closer than what we already got, query them
119                 if l.has_key('nodes'):
120                         for node in l['nodes']:
121                                 n = Node().initWithDict(node)
122                                 if not self.found.has_key(n.id):
123                                         self.found[n.id] = n
124                 elif l.has_key('values'):
125                         def x(y, z=self.results):
126                                 y = y.decode('base64')
127                                 if not z.has_key(y):
128                                         z[y] = 1
129                                         return y
130                                 else:
131                                         return None
132                         v = filter(None, map(x, l['values']))
133                         if(len(v)):
134                                 reactor.callFromThread(self.callback, v)
135                 self.schedule()
136                 
137     ## get value
138     def schedule(self):
139                 if self.finished:
140                         return
141                 l = self.found.values()
142                 l.sort(self.sort)
143                 
144                 for node in l[:K]:
145                         if (not self.queried.has_key(node.id)) and node.id != self.table.node.id:
146                                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
147                                 df = node.findValue(self.target, self.table.node.senderDict())
148                                 df.addCallback(self.handleGotNodes)
149                                 df.addErrback(self.makeMsgFailed(node))
150                                 self.outstanding = self.outstanding + 1
151                                 self.queried[node.id] = 1
152                         if self.outstanding >= const.CONCURRENT_REQS:
153                                 break
154                 assert(self.outstanding) >=0
155                 if self.outstanding == 0:
156                         ## all done, didn't find it!!
157                         self.finished=1
158                         reactor.callFromThread(self.callback,[])
159
160     ## get value
161     def goWithNodes(self, nodes, found=None):
162                 self.results = {}
163                 if found:
164                         for n in found:
165                                 self.results[n] = 1
166                 for node in nodes:
167                         if node.id == self.table.node.id:
168                                 continue
169                         else:
170                                 self.found[node.id] = node
171                         
172                 self.schedule()
173
174
175
176 class KeyExpirer:
177         def __init__(self, store):
178                 self.store = store
179                 reactor.callLater(const.KEINITIAL_DELAY, self.doExpire)
180         
181         def doExpire(self):
182                 self.cut = "%0.6f" % (time() - const.KE_AGE)
183                 self._expire()
184         
185         def _expire(self):
186                 c = self.store.cursor()
187                 s = "delete from kv where time < '%s';" % self.cut
188                 c.execute(s)
189                 reactor.callLater(const.KE_DELAY, self.doExpire)
190