2 from pickle import loads, dumps
6 from const import reactor
8 from hash import intify
9 from knode import KNode as Node
10 from ktable import KTable, K
12 # concurrent FIND_NODE/VALUE requests!
16 """ base class for some long running asynchronous proccesses like finding nodes or values """
17 def __init__(self, table, target, callback):
20 self.int = intify(target)
24 self.callback = callback
28 def sort(a, b, int=self.int):
29 """ this function is for sorting nodes relative to the ID we are looking for """
30 x, y = int ^ a.int, int ^ b.int
38 def goWithNodes(self, t):
43 FIND_NODE_TIMEOUT = 15
45 class FindNode(ActionBase):
46 """ find node action merits it's own class as it is a long running stateful process """
47 def handleGotNodes(self, args):
49 if self.finished or self.answered.has_key(sender['id']):
50 # a day late and a dollar short
52 self.outstanding = self.outstanding - 1
53 self.answered[sender['id']] = 1
55 if not self.found.has_key(node['id']):
56 n = Node(node['id'], node['host'], node['port'])
58 self.table.insertNode(n)
63 send messages to new peers, if necessary
67 l = self.found.values()
71 if node.id == self.target:
73 return self.callback([node])
74 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
75 #xxxx t.timeout = time.time() + FIND_NODE_TIMEOUT
76 df = node.findNode(self.target, self.table.node.senderDict())
77 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
78 self.outstanding = self.outstanding + 1
79 self.queried[node.id] = 1
80 if self.outstanding >= N:
82 assert(self.outstanding) >=0
83 if self.outstanding == 0:
86 reactor.callFromThread(self.callback, l[:K])
88 def defaultGotNodes(self, t):
91 self.outstanding = self.outstanding - 1
95 def goWithNodes(self, nodes):
97 this starts the process, our argument is a transaction with t.extras being our list of nodes
98 it's a transaction since we got called from the dispatcher
101 if node.id == self.table.node.id:
103 self.found[node.id] = node
104 #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
105 df = node.findNode(self.target, self.table.node.senderDict())
106 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
107 self.outstanding = self.outstanding + 1
108 self.queried[node.id] = 1
109 if self.outstanding == 0:
113 GET_VALUE_TIMEOUT = 15
114 class GetValue(FindNode):
115 """ get value task """
116 def handleGotNodes(self, args):
118 if self.finished or self.answered.has_key(sender['id']):
119 # a day late and a dollar short
121 self.outstanding = self.outstanding - 1
122 self.answered[sender['id']] = 1
124 # if we have any closer than what we already got, query them
125 if l.has_key('nodes'):
126 for node in l['nodes']:
127 if not self.found.has_key(node['id']):
128 n = Node(node['id'], node['host'], node['port'])
130 self.table.insertNode(n)
131 elif l.has_key('values'):
132 def x(y, z=self.results):
136 v = filter(None, map(x, l['values']))
138 reactor.callFromThread(self.callback, v)
145 l = self.found.values()
149 if not self.queried.has_key(node.id) and node.id != self.table.node.id:
150 #xxx t.timeout = time.time() + GET_VALUE_TIMEOUT
151 df = node.findValue(self.target, self.table.node.senderDict())
152 df.addCallback(self.handleGotNodes)
153 df.addErrback(self.defaultGotNodes)
154 self.outstanding = self.outstanding + 1
155 self.queried[node.id] = 1
156 if self.outstanding >= N:
158 assert(self.outstanding) >=0
159 if self.outstanding == 0:
160 ## all done, didn't find it!!
162 reactor.callFromThread(self.callback,[])
165 def goWithNodes(self, nodes):
168 if node.id == self.table.node.id:
170 self.found[node.id] = node
171 #xxx t.timeout = time.time() + FIND_NODE_TIMEOUT
172 df = node.findValue(self.target, self.table.node.senderDict())
173 df.addCallbacks(self.handleGotNodes, self.defaultGotNodes)
174 self.outstanding = self.outstanding + 1
175 self.queried[node.id] = 1
176 if self.outstanding == 0:
177 reactor.callFromThread(self.callback, [])
180 KEINITIAL_DELAY = 60 # 1 minute
181 KE_DELAY = 60 # 1 minute
184 def __init__(self, store, itime, kw):
188 reactor.callLater(KEINITIAL_DELAY, self.doExpire)
191 self.cut = `time() - KE_AGE`
195 ic = self.itime.cursor()
196 sc = self.store.cursor()
197 kc = self.kw.cursor()
200 irec = ic.set_range(self.cut)
201 except db.DBNotFoundError:
202 # everything is expired
211 k, v, lt = loads(self.store[h])
218 except db.DBNotFoundError:
219 print "Database inconsistency! No key->value entry when a store entry was found!"
227 reactor.callLater(KE_DELAY, self.doExpire)
229 print ">>>KE: done expiring %d" % i