]> git.mxchange.org Git - quix0rs-apt-p2p.git/blob - actions.py
call back from GetValues each time we get some new values
[quix0rs-apt-p2p.git] / actions.py
1 from const import reactor
2
3 from hash import intify
4 from knode import KNode as Node
5 from ktable import KTable, K
6 # concurrent FIND_NODE/VALUE requests!
7 N = 3
8
9 class ActionBase:
10     """ base class for some long running asynchronous proccesses like finding nodes or values """
11     def __init__(self, table, target, callback):
12         self.table = table
13         self.target = target
14         self.int = intify(target)
15         self.found = {}
16         self.queried = {}
17         self.answered = {}
18         self.callback = callback
19         self.outstanding = 0
20         self.finished = 0
21         
22         def sort(a, b, int=self.int):
23             """ this function is for sorting nodes relative to the ID we are looking for """
24             x, y = int ^ a.int, int ^ b.int
25             if x > y:
26                 return 1
27             elif x < y:
28                 return -1
29             return 0
30         self.sort = sort
31     
32     def goWithNodes(self, t):
33         pass
34         
35         
36
37 FIND_NODE_TIMEOUT = 15
38
39 class FindNode(ActionBase):
40     """ find node action merits it's own class as it is a long running stateful process """
41     def handleGotNodes(self, args):
42         l, sender = args
43         if self.finished or self.answered.has_key(sender['id']):
44             # a day late and a dollar short
45             return
46         self.outstanding = self.outstanding - 1
47         self.answered[sender['id']] = 1
48         for node in l:
49             if not self.found.has_key(node['id']):
50                 n = Node(node['id'], node['host'], node['port'])
51                 self.found[n.id] = n
52                 self.table.insertNode(n)
53         self.schedule()
54                 
55     def schedule(self):
56         """
57             send messages to new peers, if necessary
58         """
59         if self.finished:
60             return
61         l = self.found.values()
62         l.sort(self.sort)
63
64         for node in l[:K]:
65             if node.id == self.target:
66                 self.finished=1
67                 return self.callback([node])
68             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
69                 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
70                 df = node.findNode(self.target, self.table.node.senderDict())
71                 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
72                 self.outstanding = self.outstanding + 1
73                 self.queried[node.id] = 1
74             if self.outstanding >= N:
75                 break
76         assert(self.outstanding) >=0
77         if self.outstanding == 0:
78             ## all done!!
79             self.finished=1
80             reactor.callFromThread(self.callback, l[:K])
81         
82     def defaultGotNodes(self, t):
83         if self.finished:
84             return
85         self.outstanding = self.outstanding - 1
86         self.schedule()
87         
88         
89     def goWithNodes(self, nodes):
90         """
91             this starts the process, our argument is a transaction with t.extras being our list of nodes
92             it's a transaction since we got called from the dispatcher
93         """
94         for node in nodes:
95             if node.id == self.table.node.id:
96                 continue
97             self.found[node.id] = node
98             #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
99             df = node.findNode(self.target, self.table.node.senderDict())
100             df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
101             self.outstanding = self.outstanding + 1
102             self.queried[node.id] = 1
103         if self.outstanding == 0:
104             self.callback(nodes)
105
106
107 GET_VALUE_TIMEOUT = 15
108 class GetValue(FindNode):
109     """ get value task """
110     def handleGotNodes(self, args):
111         l, sender = args
112         if self.finished or self.answered.has_key(sender['id']):
113             # a day late and a dollar short
114             return
115         self.outstanding = self.outstanding - 1
116         self.answered[sender['id']] = 1
117         # go through nodes
118         # if we have any closer than what we already got, query them
119         if l.has_key('nodes'):
120             for node in l['nodes']:
121                 if not self.found.has_key(node['id']):
122                     n = Node(node['id'], node['host'], node['port'])
123                     self.found[n.id] = n
124                     self.table.insertNode(n)
125         elif l.has_key('values'):
126             def x(y, z=self.results):
127                 if not z.has_key(y):
128                     z[y] = 1
129                     return y
130             v = filter(None, map(x, l['values']))
131             if(len(v)):
132                 reactor.callFromThread(self.callback, v)
133         self.schedule()
134                 
135     ## get value
136     def schedule(self):
137         if self.finished:
138             return
139         l = self.found.values()
140         l.sort(self.sort)
141
142         for node in l[:K]:
143             if not self.queried.has_key(node.id) and node.id != self.table.node.id:
144                 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
145                 df = node.findValue(self.target, self.table.node.senderDict())
146                 df.addCallback(self.handleGotNodes)
147                 df.addErrback(self.defaultGotNodes)
148                 self.outstanding = self.outstanding + 1
149                 self.queried[node.id] = 1
150             if self.outstanding >= N:
151                 break
152         assert(self.outstanding) >=0
153         if self.outstanding == 0:
154             ## all done, didn't find it!!
155             self.finished=1
156             reactor.callFromThread(self.callback,[])
157     
158     ## get value
159     def goWithNodes(self, nodes):
160         self.results = {}
161         for node in nodes:
162             if node.id == self.table.node.id:
163                 continue
164             self.found[node.id] = node
165             #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
166             df = node.findValue(self.target, self.table.node.senderDict())
167             df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
168             self.outstanding = self.outstanding + 1
169             self.queried[node.id] = 1
170         if self.outstanding == 0:
171             reactor.callFromThread(self.callback, [])
172