From: Roland Häder <roland@mxchange.org>
Date: Sat, 9 Apr 2011 00:19:09 +0000 (+0000)
Subject: Added encapsulation:
X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=8af7e757a89e4f8e62e2825357d6e83c8ceb3386;p=hub.git

Added encapsulation:
- Encapsulated adding of entries to the producer's out-going/incoming queue
- Encapsulated checking if the out-going/incoming queue's size has reached a
  configurable limit
- Encapsulated initialization (and forced re-init) of both queues
---

diff --git a/application/hub/config.php b/application/hub/config.php
index 47311bf19..450c79167 100644
--- a/application/hub/config.php
+++ b/application/hub/config.php
@@ -304,7 +304,7 @@ $cfg->setConfigEntry('stacker_out_queue_max_size', 10000);
 $cfg->setConfigEntry('stacker_object_registry_max_size', 100);
 
 // CFG: STACKER-OUTGOING-QUEUE-MAX-SIZE
-$cfg->setConfigEntry('stacker_outgoing_queue_max_size', 1000);
+$cfg->setConfigEntry('stacker_outgoing_queue_max_size', 100000);
 
 // CFG: STACKER-INCOMING-QUEUE-MAX-SIZE
 $cfg->setConfigEntry('stacker_incoming_queue_max_size', 100000);
@@ -609,5 +609,8 @@ $cfg->setConfigEntry('cruncher_init_state_class', 'CruncherInitState');
 // CFG: CRUNCHER-VIRGIN-STATE-CLASS
 $cfg->setConfigEntry('cruncher_virgin_state_class', 'CruncherVirginState');
 
+// CFG: CRUNCHER-PER-UNIT-KEY-LIMIT
+$cfg->setConfigEntry('cruncher_per_unit_key_limit', 10000);
+
 // [EOF]
 ?>
diff --git a/application/hub/main/producer/class_BaseProducer.php b/application/hub/main/producer/class_BaseProducer.php
index af8cc7318..5ef91a4df 100644
--- a/application/hub/main/producer/class_BaseProducer.php
+++ b/application/hub/main/producer/class_BaseProducer.php
@@ -105,17 +105,75 @@ abstract class BaseProducer extends BaseFrameworkSystem {
 		$this->setOutgoingQueueInstance(ObjectFactory::createObjectByConfiguredName('producer_outgoing_queue'));
 
 		// Init the queue
-		$this->getOutgoingQueueInstance()->initStacker('outgoing_queue');
+		$this->initOutgoingQueue();
 
 		// Get an instance and set it in this producer
-		$this->setOutgoingQueueInstance(ObjectFactory::createObjectByConfiguredName('producer_incoming_queue'));
+		$this->setIncomingQueueInstance(ObjectFactory::createObjectByConfiguredName('producer_incoming_queue'));
 
 		// Init the queue
-		$this->getOutgoingQueueInstance()->initStacker('incoming_queue');
+		$this->initIncomingQueue();
 
 		// Debug message
 		$this->debugOutput('PRODUCER: All queues have been initialized.');
 	}
+
+	/**
+	 * Inits the out-going queue stack
+	 *
+	 * @return	void
+	 */
+	protected function initOutgoingQueue () {
+		$this->getOutgoingQueueInstance()->initStacker('outgoing_queue', true);
+	}
+
+	/**
+	 * Adds an entry to the out-going work queue
+	 *
+	 * @param	$value	The value to be added to the out-going work queue
+	 * @return	void
+	 */
+	protected function addValueToOutgoingQueue ($value) {
+		$this->getOutgoingQueueInstance()->pushNamed('outgoing_queue', $value);
+	}
+
+	/**
+	 * Checks wether a configurable out-going queue limit has been reached
+	 *
+	 * @param	$configEntry	Configuration entry where the limit is stored
+	 * @return	$isReached		Wether the limit is reached
+	 */
+	protected function isOutgoingQueueLimitReached($configEntry) {
+		return ($this->getConfigInstance()->getConfigEntry($configEntry) <= $this->getOutgoingQueueInstance()->getStackCount('outgoing_queue'));
+	}
+
+	/**
+	 * Inits the incoming queue stack
+	 *
+	 * @return	void
+	 */
+	protected function initIncomingQueue () {
+		$this->getIncomingQueueInstance()->initStacker('incoming_queue', true);
+	}
+
+	/**
+	 * Adds an entry to the incoming work queue
+	 *
+	 * @param	$value	The value to be added to the incoming work queue
+	 * @return	void
+	 */
+	protected function addValueToIncomingQueue ($value) {
+		$this->getIncomingQueueInstance()->pushNamed('incoming_queue', $value);
+	}
+
+	/**
+	 * Checks wether a configurable incoming queue limit has been reached
+	 *
+	 * @param	$configEntry	Configuration entry where the limit is stored
+	 * @return	$isReached		Wether the limit is reached
+	 */
+	protected function isIncomingQueueLimitReached($configEntry) {
+		return ($this->getConfigInstance()->getConfigEntry($configEntry) <= $this->getIncomingQueueInstance()->getStackCount('incoming_queue'));
+	}
 }
 
 // [EOF]
diff --git a/application/hub/main/producer/cruncher/class_BaseKeyProducer.php b/application/hub/main/producer/cruncher/class_BaseKeyProducer.php
index bd3fcc2a6..c89fa5778 100644
--- a/application/hub/main/producer/cruncher/class_BaseKeyProducer.php
+++ b/application/hub/main/producer/cruncher/class_BaseKeyProducer.php
@@ -31,6 +31,8 @@ abstract class BaseKeyProducer extends BaseProducer {
 	protected function __construct ($className) {
 		// Call parent constructor
 		parent::__construct($className);
+
+		// Init key producer
 	}
 }
 
diff --git a/application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php b/application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php
index 3b14f8e44..9e4f51787 100644
--- a/application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php
+++ b/application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php
@@ -79,7 +79,7 @@ class CruncherKeyProducer extends BaseKeyProducer implements KeyProducer, Regist
 			return;
 		} elseif (!$this->getIteratorInstance()->valid()) {
 			// This iterator has finished his assignment
-			$this->debugOutput('ITERATOR: Finished creating keys.');
+			$this->debugOutput('ITERATOR: Finished creating keys. iteratorinstance=' . $this->getIteratorInstance()->__toString() . '');
 			return;
 		}
 
@@ -92,12 +92,23 @@ class CruncherKeyProducer extends BaseKeyProducer implements KeyProducer, Regist
 		 * and stored to database for later re-usage.
 		 */
 
-		// Get current key (which is not the key of the iterator)
-		// This is always an ASCII string.
+		/*
+		 * Get current key (which is not the key of the iterator)
+		 * This is always an ASCII string.
+		 */
 		$currentKey = $this->getIteratorInstance()->current();
 
-		// @TODO Do something with it
-		$this->debugOutput('currentKey(b64)="' . base64_encode($currentKey) . '" needs to be processed.');
+		// Add it to the out-going work queue
+		$this->addValueToOutgoingQueue($currentKey);
+
+		// Is the per-work unit limit reached?
+		if ($this->isOutgoingQueueLimitReached('cruncher_per_unit_key_limit')) {
+			// @TODO Do something with it
+			$this->debugOutput('currentKey(b64)="' . base64_encode($currentKey) . '" needs to be processed.');
+
+			// At last re-init the stack
+			$this->initOutgoingQueue();
+		} // END - if
 
 		// Continue with next one
 		$this->getIteratorInstance()->next();