]> git.mxchange.org Git - hub.git/commitdiff
Added encapsulation:
authorRoland Häder <roland@mxchange.org>
Sat, 9 Apr 2011 00:19:09 +0000 (00:19 +0000)
committerRoland Häder <roland@mxchange.org>
Sat, 9 Apr 2011 00:19:09 +0000 (00:19 +0000)
- Encapsulated adding of entries to the producer's out-going/incoming queue
- Encapsulated checking if the out-going/incoming queue's size has reached a
  configurable limit
- Encapsulated initialization (and forced re-init) of both queues

application/hub/config.php
application/hub/main/producer/class_BaseProducer.php
application/hub/main/producer/cruncher/class_BaseKeyProducer.php
application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php

index 47311bf1909271589ece9b33afc95006efd0b40c..450c7916708d61a4e2d1dc7abb13ec895261ea2a 100644 (file)
@@ -304,7 +304,7 @@ $cfg->setConfigEntry('stacker_out_queue_max_size', 10000);
 $cfg->setConfigEntry('stacker_object_registry_max_size', 100);
 
 // CFG: STACKER-OUTGOING-QUEUE-MAX-SIZE
-$cfg->setConfigEntry('stacker_outgoing_queue_max_size', 1000);
+$cfg->setConfigEntry('stacker_outgoing_queue_max_size', 100000);
 
 // CFG: STACKER-INCOMING-QUEUE-MAX-SIZE
 $cfg->setConfigEntry('stacker_incoming_queue_max_size', 100000);
@@ -609,5 +609,8 @@ $cfg->setConfigEntry('cruncher_init_state_class', 'CruncherInitState');
 // CFG: CRUNCHER-VIRGIN-STATE-CLASS
 $cfg->setConfigEntry('cruncher_virgin_state_class', 'CruncherVirginState');
 
+// CFG: CRUNCHER-PER-UNIT-KEY-LIMIT
+$cfg->setConfigEntry('cruncher_per_unit_key_limit', 10000);
+
 // [EOF]
 ?>
index af8cc7318a3116f8cd9cf68bbdf165f1fe7423ba..5ef91a4df71a724cf8cebbddbbef2ffac0deacb5 100644 (file)
@@ -105,17 +105,75 @@ abstract class BaseProducer extends BaseFrameworkSystem {
                $this->setOutgoingQueueInstance(ObjectFactory::createObjectByConfiguredName('producer_outgoing_queue'));
 
                // Init the queue
-               $this->getOutgoingQueueInstance()->initStacker('outgoing_queue');
+               $this->initOutgoingQueue();
 
                // Get an instance and set it in this producer
-               $this->setOutgoingQueueInstance(ObjectFactory::createObjectByConfiguredName('producer_incoming_queue'));
+               $this->setIncomingQueueInstance(ObjectFactory::createObjectByConfiguredName('producer_incoming_queue'));
 
                // Init the queue
-               $this->getOutgoingQueueInstance()->initStacker('incoming_queue');
+               $this->initIncomingQueue();
 
                // Debug message
                $this->debugOutput('PRODUCER: All queues have been initialized.');
        }
+
+       /**
+        * Inits the out-going queue stack
+        *
+        * @return      void
+        */
+       protected function initOutgoingQueue () {
+               $this->getOutgoingQueueInstance()->initStacker('outgoing_queue', true);
+       }
+
+       /**
+        * Adds an entry to the out-going work queue
+        *
+        * @param       $value  The value to be added to the out-going work queue
+        * @return      void
+        */
+       protected function addValueToOutgoingQueue ($value) {
+               $this->getOutgoingQueueInstance()->pushNamed('outgoing_queue', $value);
+       }
+
+       /**
+        * Checks wether a configurable out-going queue limit has been reached
+        *
+        * @param       $configEntry    Configuration entry where the limit is stored
+        * @return      $isReached              Wether the limit is reached
+        */
+       protected function isOutgoingQueueLimitReached($configEntry) {
+               return ($this->getConfigInstance()->getConfigEntry($configEntry) <= $this->getOutgoingQueueInstance()->getStackCount('outgoing_queue'));
+       }
+
+       /**
+        * Inits the incoming queue stack
+        *
+        * @return      void
+        */
+       protected function initIncomingQueue () {
+               $this->getIncomingQueueInstance()->initStacker('incoming_queue', true);
+       }
+
+       /**
+        * Adds an entry to the incoming work queue
+        *
+        * @param       $value  The value to be added to the incoming work queue
+        * @return      void
+        */
+       protected function addValueToIncomingQueue ($value) {
+               $this->getIncomingQueueInstance()->pushNamed('incoming_queue', $value);
+       }
+
+       /**
+        * Checks wether a configurable incoming queue limit has been reached
+        *
+        * @param       $configEntry    Configuration entry where the limit is stored
+        * @return      $isReached              Wether the limit is reached
+        */
+       protected function isIncomingQueueLimitReached($configEntry) {
+               return ($this->getConfigInstance()->getConfigEntry($configEntry) <= $this->getIncomingQueueInstance()->getStackCount('incoming_queue'));
+       }
 }
 
 // [EOF]
index bd3fcc2a6cdbcc86e0c03f0e9e6079e5e8bf781d..c89fa57789d736418eabd2714a91b96f2ad8386d 100644 (file)
@@ -31,6 +31,8 @@ abstract class BaseKeyProducer extends BaseProducer {
        protected function __construct ($className) {
                // Call parent constructor
                parent::__construct($className);
+
+               // Init key producer
        }
 }
 
index 3b14f8e440c79c2b65749355a98aa158616e7f8c..9e4f517870cb0fb135e727ada73685a37fd7fffb 100644 (file)
@@ -79,7 +79,7 @@ class CruncherKeyProducer extends BaseKeyProducer implements KeyProducer, Regist
                        return;
                } elseif (!$this->getIteratorInstance()->valid()) {
                        // This iterator has finished his assignment
-                       $this->debugOutput('ITERATOR: Finished creating keys.');
+                       $this->debugOutput('ITERATOR: Finished creating keys. iteratorinstance=' . $this->getIteratorInstance()->__toString() . '');
                        return;
                }
 
@@ -92,12 +92,23 @@ class CruncherKeyProducer extends BaseKeyProducer implements KeyProducer, Regist
                 * and stored to database for later re-usage.
                 */
 
-               // Get current key (which is not the key of the iterator)
-               // This is always an ASCII string.
+               /*
+                * Get current key (which is not the key of the iterator)
+                * This is always an ASCII string.
+                */
                $currentKey = $this->getIteratorInstance()->current();
 
-               // @TODO Do something with it
-               $this->debugOutput('currentKey(b64)="' . base64_encode($currentKey) . '" needs to be processed.');
+               // Add it to the out-going work queue
+               $this->addValueToOutgoingQueue($currentKey);
+
+               // Is the per-work unit limit reached?
+               if ($this->isOutgoingQueueLimitReached('cruncher_per_unit_key_limit')) {
+                       // @TODO Do something with it
+                       $this->debugOutput('currentKey(b64)="' . base64_encode($currentKey) . '" needs to be processed.');
+
+                       // At last re-init the stack
+                       $this->initOutgoingQueue();
+               } // END - if
 
                // Continue with next one
                $this->getIteratorInstance()->next();