$cfg->setConfigEntry('stacker_object_registry_max_size', 100);
// CFG: STACKER-OUTGOING-QUEUE-MAX-SIZE
-$cfg->setConfigEntry('stacker_outgoing_queue_max_size', 1000);
+$cfg->setConfigEntry('stacker_outgoing_queue_max_size', 100000);
// CFG: STACKER-INCOMING-QUEUE-MAX-SIZE
$cfg->setConfigEntry('stacker_incoming_queue_max_size', 100000);
// CFG: CRUNCHER-VIRGIN-STATE-CLASS
$cfg->setConfigEntry('cruncher_virgin_state_class', 'CruncherVirginState');
+// CFG: CRUNCHER-PER-UNIT-KEY-LIMIT
+$cfg->setConfigEntry('cruncher_per_unit_key_limit', 10000);
+
// [EOF]
?>
$this->setOutgoingQueueInstance(ObjectFactory::createObjectByConfiguredName('producer_outgoing_queue'));
// Init the queue
- $this->getOutgoingQueueInstance()->initStacker('outgoing_queue');
+ $this->initOutgoingQueue();
// Get an instance and set it in this producer
- $this->setOutgoingQueueInstance(ObjectFactory::createObjectByConfiguredName('producer_incoming_queue'));
+ $this->setIncomingQueueInstance(ObjectFactory::createObjectByConfiguredName('producer_incoming_queue'));
// Init the queue
- $this->getOutgoingQueueInstance()->initStacker('incoming_queue');
+ $this->initIncomingQueue();
// Debug message
$this->debugOutput('PRODUCER: All queues have been initialized.');
}
+
+ /**
+ * Inits the out-going queue stack
+ *
+ * @return void
+ */
+ protected function initOutgoingQueue () {
+ $this->getOutgoingQueueInstance()->initStacker('outgoing_queue', true);
+ }
+
+ /**
+ * Adds an entry to the out-going work queue
+ *
+ * @param $value The value to be added to the out-going work queue
+ * @return void
+ */
+ protected function addValueToOutgoingQueue ($value) {
+ $this->getOutgoingQueueInstance()->pushNamed('outgoing_queue', $value);
+ }
+
+ /**
+ * Checks wether a configurable out-going queue limit has been reached
+ *
+ * @param $configEntry Configuration entry where the limit is stored
+ * @return $isReached Wether the limit is reached
+ */
+ protected function isOutgoingQueueLimitReached($configEntry) {
+ return ($this->getConfigInstance()->getConfigEntry($configEntry) <= $this->getOutgoingQueueInstance()->getStackCount('outgoing_queue'));
+ }
+
+ /**
+ * Inits the incoming queue stack
+ *
+ * @return void
+ */
+ protected function initIncomingQueue () {
+ $this->getIncomingQueueInstance()->initStacker('incoming_queue', true);
+ }
+
+ /**
+ * Adds an entry to the incoming work queue
+ *
+ * @param $value The value to be added to the incoming work queue
+ * @return void
+ */
+ protected function addValueToIncomingQueue ($value) {
+ $this->getIncomingQueueInstance()->pushNamed('incoming_queue', $value);
+ }
+
+ /**
+ * Checks wether a configurable incoming queue limit has been reached
+ *
+ * @param $configEntry Configuration entry where the limit is stored
+ * @return $isReached Wether the limit is reached
+ */
+ protected function isIncomingQueueLimitReached($configEntry) {
+ return ($this->getConfigInstance()->getConfigEntry($configEntry) <= $this->getIncomingQueueInstance()->getStackCount('incoming_queue'));
+ }
}
// [EOF]
return;
} elseif (!$this->getIteratorInstance()->valid()) {
// This iterator has finished his assignment
- $this->debugOutput('ITERATOR: Finished creating keys.');
+ $this->debugOutput('ITERATOR: Finished creating keys. iteratorinstance=' . $this->getIteratorInstance()->__toString() . '');
return;
}
* and stored to database for later re-usage.
*/
- // Get current key (which is not the key of the iterator)
- // This is always an ASCII string.
+ /*
+ * Get current key (which is not the key of the iterator)
+ * This is always an ASCII string.
+ */
$currentKey = $this->getIteratorInstance()->current();
- // @TODO Do something with it
- $this->debugOutput('currentKey(b64)="' . base64_encode($currentKey) . '" needs to be processed.');
+ // Add it to the out-going work queue
+ $this->addValueToOutgoingQueue($currentKey);
+
+ // Is the per-work unit limit reached?
+ if ($this->isOutgoingQueueLimitReached('cruncher_per_unit_key_limit')) {
+ // @TODO Do something with it
+ $this->debugOutput('currentKey(b64)="' . base64_encode($currentKey) . '" needs to be processed.');
+
+ // At last re-init the stack
+ $this->initOutgoingQueue();
+ } // END - if
// Continue with next one
$this->getIteratorInstance()->next();