]> git.mxchange.org Git - hub.git/blob - application/hub/main/package/class_NetworkPackage.php
Handling of decoded raw data continued:
[hub.git] / application / hub / main / package / class_NetworkPackage.php
1 <?php
2 /**
3  * A NetworkPackage class. This class implements Deliverable and Receivable
4  * because all network packages should be deliverable to other nodes and
5  * receivable from other nodes. It further provides methods for reading raw
6  * content from template engines and feeding it to the stacker for undeclared
7  * packages.
8  *
9  * The factory method requires you to provide a compressor class (which must
10  * implement the Compressor interface). If you don't want any compression (not
11  * adviceable due to increased network load), please use the NullCompressor
12  * class and encode it with BASE64 for a more error-free transfer over the
13  * Internet.
14  *
15  * For performance reasons, this class should only be instanciated once and then
16  * used as a "pipe-through" class.
17  *
18  * @author              Roland Haeder <webmaster@ship-simu.org>
19  * @version             0.0.0
20  * @copyright   Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2011 Hub Developer Team
21  * @license             GNU GPL 3.0 or any newer version
22  * @link                http://www.ship-simu.org
23  * @todo                Needs to add functionality for handling the object's type
24  *
25  * This program is free software: you can redistribute it and/or modify
26  * it under the terms of the GNU General Public License as published by
27  * the Free Software Foundation, either version 3 of the License, or
28  * (at your option) any later version.
29  *
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
33  * GNU General Public License for more details.
34  *
35  * You should have received a copy of the GNU General Public License
36  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
37  */
38 class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Receivable, Registerable {
39         /**
40          * Package mask for compressing package data:
41          * 0: Compressor extension
42          * 1: Raw package data
43          * 2: Tags, seperated by semicolons, no semicolon is required if only one tag is needed
44          * 3: Checksum
45          *                     0  1  2  3
46          */
47         const PACKAGE_MASK = '%s:%s:%s:%s';
48
49         /**
50          * Seperator for the above mask
51          */
52         const PACKAGE_MASK_SEPERATOR = ':';
53
54         /**
55          * Seperator for checksum
56          */
57         const PACKAGE_CHECKSUM_SEPERATOR = ':';
58
59         /**
60          * Array indexes for above mask, start with zero
61          */
62         const INDEX_COMPRESSOR_EXTENSION = 0;
63         const INDEX_PACKAGE_DATA         = 1;
64         const INDEX_TAGS                 = 2;
65         const INDEX_CHECKSUM             = 3;
66
67         /**
68          * Array indexes for raw package array
69          */
70         const INDEX_PACKAGE_SENDER    = 0;
71         const INDEX_PACKAGE_RECIPIENT = 1;
72         const INDEX_PACKAGE_CONTENT   = 2;
73
74         /**
75          * Named array elements for package data
76          */
77         const PACKAGE_DATA_RECIPIENT = 'recipient';
78         const PACKAGE_DATA_SENDER    = 'sender';
79         const PACKAGE_DATA_CONTENT   = 'content';
80
81         /**
82          * Tags seperator
83          */
84         const PACKAGE_TAGS_SEPERATOR = ';';
85
86         /**
87          * Raw package data seperator
88          */
89         const PACKAGE_DATA_SEPERATOR = '#';
90
91         /**
92          * Stacker name for "undeclared" packages
93          */
94         const STACKER_NAME_UNDECLARED = 'package_undeclared';
95
96         /**
97          * Stacker name for "declared" packages (which are ready to send out)
98          */
99         const STACKER_NAME_DECLARED = 'package_declared';
100
101         /**
102          * Stacker name for "out-going" packages
103          */
104         const STACKER_NAME_OUTGOING = 'package_outgoing';
105
106         /**
107          * Stacker name for "incoming" decoded raw data
108          */
109         const STACKER_NAME_DECODED_INCOMING = 'package_decoded_data';
110
111         /**
112          * Stacker name for handled decoded raw data
113          */
114         const STACKER_NAME_DECODED_HANDLED = 'package_handled_decoded';
115
116         /**
117          * Stacker name for "back-buffered" packages
118          */
119         const STACKER_NAME_BACK_BUFFER = 'package_backbuffer';
120
121         /**
122          * Network target (alias): 'upper hubs'
123          */
124         const NETWORK_TARGET_UPPER_HUBS = 'upper';
125
126         /**
127          * Network target (alias): 'self'
128          */
129         const NETWORK_TARGET_SELF = 'self';
130
131         /**
132          * TCP package size in bytes
133          */
134         const TCP_PACKAGE_SIZE = 512;
135
136         /**
137          * Protected constructor
138          *
139          * @return      void
140          */
141         protected function __construct () {
142                 // Call parent constructor
143                 parent::__construct(__CLASS__);
144         }
145
146         /**
147          * Creates an instance of this class
148          *
149          * @param       $compressorInstance             A Compressor instance for compressing the content
150          * @return      $packageInstance                An instance of a Deliverable class
151          */
152         public static final function createNetworkPackage (Compressor $compressorInstance) {
153                 // Get new instance
154                 $packageInstance = new NetworkPackage();
155
156                 // Now set the compressor instance
157                 $packageInstance->setCompressorInstance($compressorInstance);
158
159                 /*
160                  * We need to initialize a stack here for our packages even for those
161                  * which have no recipient address and stamp... ;-) This stacker will
162                  * also be used for incoming raw data to handle it.
163                  */
164                 $stackerInstance = ObjectFactory::createObjectByConfiguredName('network_package_stacker_class');
165
166                 // At last, set it in this class
167                 $packageInstance->setStackerInstance($stackerInstance);
168
169                 // Init all stacker
170                 $packageInstance->initStackers();
171
172                 // Get a visitor instance for speeding up things
173                 $visitorInstance = ObjectFactory::createObjectByConfiguredName('node_raw_data_monitor_visitor_class', array($packageInstance));
174
175                 // Set it in this package
176                 $packageInstance->setVisitorInstance($visitorInstance);
177
178                 // Return the prepared instance
179                 return $packageInstance;
180         }
181
182         /**
183          * Initialize all stackers
184          *
185          * @return      void
186          */
187         protected function initStackers () {
188                 // Initialize all
189                 foreach (
190                         array(
191                                 self::STACKER_NAME_UNDECLARED,
192                                 self::STACKER_NAME_DECLARED,
193                                 self::STACKER_NAME_OUTGOING,
194                                 self::STACKER_NAME_DECODED_INCOMING,
195                                 self::STACKER_NAME_DECODED_HANDLED,
196                                 self::STACKER_NAME_BACK_BUFFER
197                         ) as $stackerName) {
198                         // Init this stacker
199                         $this->getStackerInstance()->initStacker($stackerName);
200                 } // END - foreach
201         }
202
203         /**
204          * "Getter" for hash from given content and helper instance
205          *
206          * @param       $content        Raw package content
207          * @param       $helperInstance         An instance of a BaseHubHelper class
208          * @param       $nodeInstance           An instance of a NodeHelper class
209          * @return      $hash   Hash for given package content
210          * @todo        $helperInstance is unused
211          */
212         private function getHashFromContent ($content, BaseHubHelper $helperInstance, NodeHelper $nodeInstance) {
213                 // Create the hash
214                 // @TODO crc32 is not very strong, but it needs to be fast
215                 $hash = crc32(
216                         $content .
217                         self::PACKAGE_CHECKSUM_SEPERATOR .
218                         $nodeInstance->getSessionId() .
219                         self::PACKAGE_CHECKSUM_SEPERATOR .
220                         $this->getCompressorInstance()->getCompressorExtension()
221                 );
222
223                 // And return it
224                 return $hash;
225         }
226
227         ///////////////////////////////////////////////////////////////////////////
228         //                   Delivering packages / raw data
229         ///////////////////////////////////////////////////////////////////////////
230
231         /**
232          * Delivers the given raw package data.
233          *
234          * @param       $packageData    Raw package data in an array
235          * @return      void
236          */
237         private function declareRawPackageData (array $packageData) {
238                 /*
239                  * We need to disover every recipient, just in case we have a
240                  * multi-recipient entry like 'upper' is. 'all' may be a not so good
241                  * target because it causes an overload on the network and may be
242                  * abused for attacking the network with large packages.
243                  */
244                 $discoveryInstance = PackageDiscoveryFactory::createPackageDiscoveryInstance();
245
246                 // Discover all recipients, this may throw an exception
247                 $discoveryInstance->discoverRecipients($packageData);
248
249                 // Now get an iterator
250                 $iteratorInstance = $discoveryInstance->getIterator();
251
252                 // ... and begin iteration
253                 while ($iteratorInstance->valid()) {
254                         // Get current entry
255                         $currentRecipient = $iteratorInstance->current();
256
257                         // Debug message
258                         $this->debugOutput('PACKAGE: Package declared for recipient ' . $currentRecipient);
259
260                         // Set the recipient
261                         $packageData[self::PACKAGE_DATA_RECIPIENT] = $currentRecipient;
262
263                         // And enqueue it to the writer class
264                         $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECLARED, $packageData);
265
266                         // Skip to next entry
267                         $iteratorInstance->next();
268                 } // END - while
269
270                 // Clean-up the list
271                 $discoveryInstance->clearRecipients();
272         }
273
274         /**
275          * Delivers raw package data. In short, this will discover the raw socket
276          * resource through a discovery class (which will analyse the receipient of
277          * the package), register the socket with the connection (handler/helper?)
278          * instance and finally push the raw data on our outgoing queue.
279          *
280          * @param       $packageData    Raw package data in an array
281          * @return      void
282          */
283         private function deliverRawPackageData (array $packageData) {
284                 /*
285                  * This package may become big, depending on the shared object size or
286                  * delivered message size which shouldn't be so long (to save
287                  * bandwidth). Because of the nature of the used protocol (TCP) we need
288                  * to split it up into smaller pieces to fit it into a TCP frame.
289                  *
290                  * So first we need (again) a discovery class but now a protocol
291                  * discovery to choose the right socket resource. The discovery class
292                  * should take a look at the raw package data itself and then decide
293                  * which (configurable!) protocol should be used for that type of
294                  * package.
295                  */
296                 $discoveryInstance = SocketDiscoveryFactory::createSocketDiscoveryInstance();
297
298                 // Now discover the right protocol
299                 $socketResource = $discoveryInstance->discoverSocket($packageData);
300
301                 // We have to put this socket in our registry, so get an instance
302                 $registryInstance = SocketRegistry::createSocketRegistry();
303
304                 // Get the listener from registry
305                 $connectionInstance = Registry::getRegistry()->getInstance('connection');
306
307                 // Is it not there?
308                 if (!$registryInstance->isSocketRegistered($connectionInstance, $socketResource)) {
309                         // Then register it
310                         $registryInstance->registerSocket($connectionInstance, $socketResource, $packageData);
311                 } // END - if
312
313                 // We enqueue it again, but now in the out-going queue
314                 $this->getStackerInstance()->pushNamed(self::STACKER_NAME_OUTGOING, $packageData);
315         }
316
317         /**
318          * Sends waiting packages
319          *
320          * @param       $packageData    Raw package data
321          * @return      void
322          */
323         private function sendOutgoingRawPackageData (array $packageData) {
324                 // Get the right connection instance
325                 $connectionInstance = SocketRegistry::createSocketRegistry()->getHandlerInstanceFromPackageData($packageData);
326
327                 // Is this connection still alive?
328                 if ($connectionInstance->isShuttedDown()) {
329                         // This connection is shutting down
330                         // @TODO We may want to do somthing more here?
331                         return;
332                 } // END - if
333
334                 // Sent it away (we catch exceptions one method above)
335                 $sentBytes = $connectionInstance->sendRawPackageData($packageData);
336
337                 // Remember unsent raw bytes in back-buffer, if any
338                 $this->storeUnsentBytesInBackBuffer($packageData, $sentBytes);
339         }
340
341         /**
342          * "Enqueues" raw content into this delivery class by reading the raw content
343          * from given template instance and pushing it on the 'undeclared' stack.
344          *
345          * @param       $helperInstance         An instance of a  BaseHubHelper class
346          * @param       $nodeInstance           An instance of a NodeHelper class
347          * @return      void
348          */
349         public function enqueueRawDataFromTemplate (BaseHubHelper $helperInstance, NodeHelper $nodeInstance) {
350                 // Get the raw content ...
351                 $content = $helperInstance->getTemplateInstance()->getRawTemplateData();
352
353                 // ... and compress it
354                 $content = $this->getCompressorInstance()->compressStream($content);
355
356                 // Add magic in front of it and hash behind it, including BASE64 encoding
357                 $content = sprintf(self::PACKAGE_MASK,
358                         // 1.) Compressor's extension
359                         $this->getCompressorInstance()->getCompressorExtension(),
360                         // 2.) Raw package content, encoded with BASE64
361                         base64_encode($content),
362                         // 3.) Tags
363                         implode(self::PACKAGE_TAGS_SEPERATOR, $helperInstance->getPackageTags()),
364                         // 4.) Checksum
365                         $this->getHashFromContent($content, $helperInstance, $nodeInstance)
366                 );
367
368                 // Now prepare the temporary array and push it on the 'undeclared' stack
369                 $this->getStackerInstance()->pushNamed(self::STACKER_NAME_UNDECLARED, array(
370                         self::PACKAGE_DATA_SENDER    => $nodeInstance->getSessionId(),
371                         self::PACKAGE_DATA_RECIPIENT => $helperInstance->getRecipientType(),
372                         self::PACKAGE_DATA_CONTENT   => $content,
373                 ));
374         }
375
376         /**
377          * Checks wether a package has been enqueued for delivery.
378          *
379          * @return      $isEnqueued             Wether a package is enqueued
380          */
381         public function isPackageEnqueued () {
382                 // Check wether the stacker is not empty
383                 $isEnqueued = (($this->getStackerInstance()->isStackInitialized(self::STACKER_NAME_UNDECLARED)) && (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_UNDECLARED)));
384
385                 // Return the result
386                 return $isEnqueued;
387         }
388
389         /**
390          * Checks wether a package has been declared
391          *
392          * @return      $isDeclared             Wether a package is declared
393          */
394         public function isPackageDeclared () {
395                 // Check wether the stacker is not empty
396                 $isDeclared = (($this->getStackerInstance()->isStackInitialized(self::STACKER_NAME_DECLARED)) && (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECLARED)));
397
398                 // Return the result
399                 return $isDeclared;
400         }
401
402         /**
403          * Checks wether a package should be sent out
404          *
405          * @return      $isWaitingDelivery      Wether a package is waiting for delivery
406          */
407         public function isPackageWaitingForDelivery () {
408                 // Check wether the stacker is not empty
409                 $isWaitingDelivery = (($this->getStackerInstance()->isStackInitialized(self::STACKER_NAME_OUTGOING)) && (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_OUTGOING)));
410
411                 // Return the result
412                 return $isWaitingDelivery;
413         }
414
415         /**
416          * Delivers an enqueued package to the stated destination. If a non-session
417          * id is provided, recipient resolver is being asked (and instanced once).
418          * This allows that a single package is being delivered to multiple targets
419          * without enqueueing it for every target. If no target is provided or it
420          * can't be determined a NoTargetException is being thrown.
421          *
422          * @return      void
423          * @throws      NoTargetException       If no target can't be determined
424          */
425         public function declareEnqueuedPackage () {
426                 // Make sure this method isn't working if there is no package enqueued
427                 if (!$this->isPackageEnqueued()) {
428                         // This is not fatal but should be avoided
429                         // @TODO Add some logging here
430                         return;
431                 } // END - if
432
433                 // Now we know for sure there are packages to deliver, we can start
434                 // with the first one.
435                 $packageData = $this->getStackerInstance()->getNamed(self::STACKER_NAME_UNDECLARED);
436
437                 // Declare the raw package data for delivery
438                 $this->declareRawPackageData($packageData);
439
440                 // And remove it finally
441                 $this->getStackerInstance()->popNamed(self::STACKER_NAME_UNDECLARED);
442         }
443
444         /**
445          * Delivers the next declared package. Only one package per time will be sent
446          * because this may take time and slows down the whole delivery
447          * infrastructure.
448          *
449          * @return      void
450          */
451         public function deliverDeclaredPackage () {
452                 // Sanity check if we have packages declared
453                 if (!$this->isPackageDeclared()) {
454                         // This is not fatal but should be avoided
455                         // @TODO Add some logging here
456                         return;
457                 } // END - if
458
459                 // Get the package again
460                 $packageData = $this->getStackerInstance()->getNamed(self::STACKER_NAME_DECLARED);
461
462                 // And send it
463                 $this->deliverRawPackageData($packageData);
464
465                 // And remove it finally
466                 $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECLARED);
467         }
468
469         /**
470          * Sends waiting packages out for delivery
471          *
472          * @return      void
473          */
474         public function sendWaitingPackage () {
475                 // Send any waiting bytes in the back-buffer before sending a new package
476                 $this->sendBackBufferBytes();
477
478                 // Sanity check if we have packages waiting for delivery
479                 if (!$this->isPackageWaitingForDelivery()) {
480                         // This is not fatal but should be avoided
481                         $this->debugOutput('PACKAGE: No package is waiting for delivery, but ' . __METHOD__ . ' was called.');
482                         return;
483                 } // END - if
484
485                 // Get the package again
486                 $packageData = $this->getStackerInstance()->getNamed(self::STACKER_NAME_OUTGOING);
487
488                 try {
489                         // Now try to send it
490                         $this->sendOutgoingRawPackageData($packageData);
491
492                         // And remove it finally
493                         $this->getStackerInstance()->popNamed(self::STACKER_NAME_OUTGOING);
494                 } catch (InvalidSocketException $e) {
495                         // Output exception message
496                         $this->debugOutput('PACKAGE: Package was not delivered: ' . $e->getMessage());
497                 }
498         }
499
500         ///////////////////////////////////////////////////////////////////////////
501         //                   Receiving packages / raw data
502         ///////////////////////////////////////////////////////////////////////////
503
504         /**
505          * Checks wether decoded raw data is pending
506          *
507          * @return      $isPending      Wether decoded raw data is pending
508          */
509         private function isDecodedDataPending () {
510                 // Just return wether the stack is not empty
511                 $isPending = (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECODED_INCOMING));
512
513                 // Return the status
514                 return $isPending;
515         }
516
517         /**
518          * Checks wether new raw package data has arrived at a socket
519          *
520          * @param       $poolInstance   An instance of a PoolableListener class
521          * @return      $hasArrived             Wether new raw package data has arrived for processing
522          */
523         public function isNewRawDataPending (PoolableListener $poolInstance) {
524                 // Visit the pool. This monitors the pool for incoming raw data.
525                 $poolInstance->accept($this->getVisitorInstance());
526
527                 // Check for new data arrival
528                 $hasArrived = $this->isDecodedDataPending();
529
530                 // Return the status
531                 return $hasArrived;
532         }
533
534         /**
535          * Handles the incoming decoded raw data. This method does not "convert" the
536          * decoded data back into a package array, it just "handles" it and pushs it
537          * on the next stack.
538          *
539          * @return      void
540          */
541         public function handleIncomingDecodedData () {
542                 /*
543                  * This method should only be called if decoded raw data is pending,
544                  * so check it again.
545                  */
546                 if (!$this->isDecodedDataPending()) {
547                         // This is not fatal but should be avoided
548                         // @TODO Add some logging here
549                         return;
550                 } // END - if
551
552                 // Very noisy debug message:
553                 /* NOISY-DEBUG: */ $this->debugOutput('PACKAGE: Stacker size is ' . $this->getStackerInstance()->getStackCount(self::STACKER_NAME_DECODED_INCOMING) . ' entries.');
554
555                 // "Pop" the next entry (the same array again) from the stack
556                 $decodedData = $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECODED_INCOMING);
557
558                 // Make sure both array elements are there
559                 assert((isset($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA])) && (isset($decodedData[BaseRawDataHandler::PACKAGE_ERROR_CODE])));
560
561                 /*
562                  * Also make sure the error code is SOCKET_ERROR_UNHANDLED because we
563                  * only want to handle unhandled packages here.
564                  */
565                 assert($decodedData[BaseRawDataHandler::PACKAGE_ERROR_CODE] == BaseRawDataHandler::SOCKET_ERROR_UNHANDLED);
566
567                 // Remove the last chunk seperator (because it is being added and we don't need it)
568                 if (substr($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA], -1, 1) == PackageFragmenter::CHUNK_SEPERATOR) {
569                         // It is there and should be removed
570                         $decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA] = substr($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA], 0, -1);
571                 } // END - if
572
573                 // This package is "handled" and can be pushed on the next stack
574                 $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECODED_HANDLED, $decodedData);
575         }
576
577         /**
578          * Adds raw decoded data from the given handler instance to this receiver
579          *
580          * @param       $handlerInstance        An instance of a Networkable class
581          * @return      void
582          */
583         public function addDecodedDataToIncomingStack (Networkable $handlerInstance) {
584                 /*
585                  * Get the decoded data from the handler, this is an array with
586                  * 'decoded_data' and 'error_code' as elements.
587                  */
588                 $decodedData = $handlerInstance->getNextDecodedData();
589
590                 // Very noisy debug message:
591                 //* NOISY-DEBUG: */ $this->debugOutput('PACKAGE: decodedData[' . gettype($decodedData) . ']=' . print_r($decodedData, true));
592
593                 // And push it on our stack
594                 $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECODED_INCOMING, $decodedData);
595         }
596
597         /**
598          * Checks wether incoming decoded data is handled.
599          *
600          * @return      $isHandled      Wether incoming decoded data is handled
601          */
602         public function isIncomingDecodedDataHandled () {
603                 // Determine if the stack is not empty
604                 $isHandled = (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECODED_HANDLED));
605
606                 // Return it
607                 return $isHandled;
608         }
609
610         /**
611          * Assembles incoming decoded data so it will become an abstract network
612          * package again.
613          *
614          * @return      void
615          */
616         public function assembleDecodedDataToPackage () {
617                 $this->partialStub('Please implement this method.');
618         }
619
620         /**
621          * Checks wether a new package has arrived
622          *
623          * @return      $hasArrived             Wether a new package has arrived for processing
624          */
625         public function isNewPackageArrived () {
626                 // @TODO Add some content here
627         }
628 }
629
630 // [EOF]
631 ?>