From: Roland Häder Date: Sat, 9 Apr 2011 00:19:09 +0000 (+0000) Subject: Added encapsulation: X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=8af7e757a89e4f8e62e2825357d6e83c8ceb3386;p=hub.git Added encapsulation: - Encapsulated adding of entries to the producer's out-going/incoming queue - Encapsulated checking if the out-going/incoming queue's size has reached a configurable limit - Encapsulated initialization (and forced re-init) of both queues --- diff --git a/application/hub/config.php b/application/hub/config.php index 47311bf19..450c79167 100644 --- a/application/hub/config.php +++ b/application/hub/config.php @@ -304,7 +304,7 @@ $cfg->setConfigEntry('stacker_out_queue_max_size', 10000); $cfg->setConfigEntry('stacker_object_registry_max_size', 100); // CFG: STACKER-OUTGOING-QUEUE-MAX-SIZE -$cfg->setConfigEntry('stacker_outgoing_queue_max_size', 1000); +$cfg->setConfigEntry('stacker_outgoing_queue_max_size', 100000); // CFG: STACKER-INCOMING-QUEUE-MAX-SIZE $cfg->setConfigEntry('stacker_incoming_queue_max_size', 100000); @@ -609,5 +609,8 @@ $cfg->setConfigEntry('cruncher_init_state_class', 'CruncherInitState'); // CFG: CRUNCHER-VIRGIN-STATE-CLASS $cfg->setConfigEntry('cruncher_virgin_state_class', 'CruncherVirginState'); +// CFG: CRUNCHER-PER-UNIT-KEY-LIMIT +$cfg->setConfigEntry('cruncher_per_unit_key_limit', 10000); + // [EOF] ?> diff --git a/application/hub/main/producer/class_BaseProducer.php b/application/hub/main/producer/class_BaseProducer.php index af8cc7318..5ef91a4df 100644 --- a/application/hub/main/producer/class_BaseProducer.php +++ b/application/hub/main/producer/class_BaseProducer.php @@ -105,17 +105,75 @@ abstract class BaseProducer extends BaseFrameworkSystem { $this->setOutgoingQueueInstance(ObjectFactory::createObjectByConfiguredName('producer_outgoing_queue')); // Init the queue - $this->getOutgoingQueueInstance()->initStacker('outgoing_queue'); + $this->initOutgoingQueue(); // Get an instance and set it in this producer - $this->setOutgoingQueueInstance(ObjectFactory::createObjectByConfiguredName('producer_incoming_queue')); + $this->setIncomingQueueInstance(ObjectFactory::createObjectByConfiguredName('producer_incoming_queue')); // Init the queue - $this->getOutgoingQueueInstance()->initStacker('incoming_queue'); + $this->initIncomingQueue(); // Debug message $this->debugOutput('PRODUCER: All queues have been initialized.'); } + + /** + * Inits the out-going queue stack + * + * @return void + */ + protected function initOutgoingQueue () { + $this->getOutgoingQueueInstance()->initStacker('outgoing_queue', true); + } + + /** + * Adds an entry to the out-going work queue + * + * @param $value The value to be added to the out-going work queue + * @return void + */ + protected function addValueToOutgoingQueue ($value) { + $this->getOutgoingQueueInstance()->pushNamed('outgoing_queue', $value); + } + + /** + * Checks wether a configurable out-going queue limit has been reached + * + * @param $configEntry Configuration entry where the limit is stored + * @return $isReached Wether the limit is reached + */ + protected function isOutgoingQueueLimitReached($configEntry) { + return ($this->getConfigInstance()->getConfigEntry($configEntry) <= $this->getOutgoingQueueInstance()->getStackCount('outgoing_queue')); + } + + /** + * Inits the incoming queue stack + * + * @return void + */ + protected function initIncomingQueue () { + $this->getIncomingQueueInstance()->initStacker('incoming_queue', true); + } + + /** + * Adds an entry to the incoming work queue + * + * @param $value The value to be added to the incoming work queue + * @return void + */ + protected function addValueToIncomingQueue ($value) { + $this->getIncomingQueueInstance()->pushNamed('incoming_queue', $value); + } + + /** + * Checks wether a configurable incoming queue limit has been reached + * + * @param $configEntry Configuration entry where the limit is stored + * @return $isReached Wether the limit is reached + */ + protected function isIncomingQueueLimitReached($configEntry) { + return ($this->getConfigInstance()->getConfigEntry($configEntry) <= $this->getIncomingQueueInstance()->getStackCount('incoming_queue')); + } } // [EOF] diff --git a/application/hub/main/producer/cruncher/class_BaseKeyProducer.php b/application/hub/main/producer/cruncher/class_BaseKeyProducer.php index bd3fcc2a6..c89fa5778 100644 --- a/application/hub/main/producer/cruncher/class_BaseKeyProducer.php +++ b/application/hub/main/producer/cruncher/class_BaseKeyProducer.php @@ -31,6 +31,8 @@ abstract class BaseKeyProducer extends BaseProducer { protected function __construct ($className) { // Call parent constructor parent::__construct($className); + + // Init key producer } } diff --git a/application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php b/application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php index 3b14f8e44..9e4f51787 100644 --- a/application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php +++ b/application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php @@ -79,7 +79,7 @@ class CruncherKeyProducer extends BaseKeyProducer implements KeyProducer, Regist return; } elseif (!$this->getIteratorInstance()->valid()) { // This iterator has finished his assignment - $this->debugOutput('ITERATOR: Finished creating keys.'); + $this->debugOutput('ITERATOR: Finished creating keys. iteratorinstance=' . $this->getIteratorInstance()->__toString() . ''); return; } @@ -92,12 +92,23 @@ class CruncherKeyProducer extends BaseKeyProducer implements KeyProducer, Regist * and stored to database for later re-usage. */ - // Get current key (which is not the key of the iterator) - // This is always an ASCII string. + /* + * Get current key (which is not the key of the iterator) + * This is always an ASCII string. + */ $currentKey = $this->getIteratorInstance()->current(); - // @TODO Do something with it - $this->debugOutput('currentKey(b64)="' . base64_encode($currentKey) . '" needs to be processed.'); + // Add it to the out-going work queue + $this->addValueToOutgoingQueue($currentKey); + + // Is the per-work unit limit reached? + if ($this->isOutgoingQueueLimitReached('cruncher_per_unit_key_limit')) { + // @TODO Do something with it + $this->debugOutput('currentKey(b64)="' . base64_encode($currentKey) . '" needs to be processed.'); + + // At last re-init the stack + $this->initOutgoingQueue(); + } // END - if // Continue with next one $this->getIteratorInstance()->next();