+class StoreValue(ActionBase):
+ def __init__(self, table, target, value, callback, store="storeValue"):
+ ActionBase.__init__(self, table, target, callback)
+ self.value = value
+ self.stored = []
+ self.store = store
+
+ def storedValue(self, t, node):
+ self.outstanding -= 1
+ self.table.insertNode(node)
+ if self.finished:
+ return
+ self.stored.append(t)
+ if len(self.stored) >= const.STORE_REDUNDANCY:
+ self.finished=1
+ self.callback(self.stored)
+ else:
+ if not len(self.stored) + self.outstanding >= const.STORE_REDUNDANCY:
+ self.schedule()
+ return t
+
+ def storeFailed(self, t, node):
+ print ">>> store failed %s/%s" % (node.host, node.port)
+ self.table.nodeFailed(node)
+ self.outstanding -= 1
+ if self.finished:
+ return t
+ self.schedule()
+ return t
+
+ def schedule(self):
+ if self.finished:
+ return
+ num = const.CONCURRENT_REQS - self.outstanding
+ if num > const.STORE_REDUNDANCY:
+ num = const.STORE_REDUNDANCY
+ for i in range(num):
+ try:
+ node = self.nodes.pop()
+ except IndexError:
+ if self.outstanding == 0:
+ self.finished = 1
+ self.callback(self.stored)
+ else:
+ if not node.id == self.table.node.id:
+ self.outstanding += 1
+ try:
+ f = getattr(node, self.store)
+ except AttributeError:
+ print ">>> %s doesn't have a %s method!" % (node, self.store)
+ else:
+ df = f(self.target, self.value, self.table.node.id)
+ df.addCallback(self.storedValue, node=node)
+ df.addErrback(self.storeFailed, node=node)
+
+ def goWithNodes(self, nodes):
+ self.nodes = nodes
+ self.nodes.sort(self.sort)
+ self.schedule()
+