]> git.mxchange.org Git - hub.git/blob - application/hub/main/package/class_NetworkPackage.php
Hub project continued:
[hub.git] / application / hub / main / package / class_NetworkPackage.php
1 <?php
2 /**
3  * A NetworkPackage class. This class implements Deliverable and Receivable
4  * because all network packages should be deliverable to other nodes and
5  * receivable from other nodes. It further provides methods for reading raw
6  * content from template engines and feeding it to the stacker for undeclared
7  * packages.
8  *
9  * The factory method requires you to provide a compressor class (which must
10  * implement the Compressor interface). If you don't want any compression (not
11  * adviceable due to increased network load), please use the NullCompressor
12  * class and encode it with BASE64 for a more error-free transfer over the
13  * Internet.
14  *
15  * For performance reasons, this class should only be instanciated once and then
16  * used as a "pipe-through" class.
17  *
18  * @author              Roland Haeder <webmaster@ship-simu.org>
19  * @version             0.0.0
20  * @copyright   Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2011 Hub Developer Team
21  * @license             GNU GPL 3.0 or any newer version
22  * @link                http://www.ship-simu.org
23  * @todo                Needs to add functionality for handling the object's type
24  *
25  * This program is free software: you can redistribute it and/or modify
26  * it under the terms of the GNU General Public License as published by
27  * the Free Software Foundation, either version 3 of the License, or
28  * (at your option) any later version.
29  *
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
33  * GNU General Public License for more details.
34  *
35  * You should have received a copy of the GNU General Public License
36  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
37  */
38 class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Receivable, Registerable {
39         /**
40          * Package mask for compressing package data:
41          * 0: Compressor extension
42          * 1: Raw package data
43          * 2: Tags, seperated by semicolons, no semicolon is required if only one tag is needed
44          * 3: Checksum
45          *                     0  1  2  3
46          */
47         const PACKAGE_MASK = '%s:%s:%s:%s';
48
49         /**
50          * Seperator for the above mask
51          */
52         const PACKAGE_MASK_SEPERATOR = ':';
53
54         /**
55          * Seperator for checksum
56          */
57         const PACKAGE_CHECKSUM_SEPERATOR = ':';
58
59         /**
60          * Array indexes for above mask, start with zero
61          */
62         const INDEX_COMPRESSOR_EXTENSION = 0;
63         const INDEX_PACKAGE_DATA         = 1;
64         const INDEX_TAGS                 = 2;
65         const INDEX_CHECKSUM             = 3;
66
67         /**
68          * Array indexes for raw package array
69          */
70         const INDEX_PACKAGE_SENDER    = 0;
71         const INDEX_PACKAGE_RECIPIENT = 1;
72         const INDEX_PACKAGE_CONTENT   = 2;
73
74         /**
75          * Named array elements for package data
76          */
77         const PACKAGE_DATA_SENDER    = 'sender';
78         const PACKAGE_DATA_RECIPIENT = 'recipient';
79         const PACKAGE_DATA_CONTENT   = 'content';
80
81         /**
82          * Tags seperator
83          */
84         const PACKAGE_TAGS_SEPERATOR = ';';
85
86         /**
87          * Raw package data seperator
88          */
89         const PACKAGE_DATA_SEPERATOR = '#';
90
91         /**
92          * Stacker name for "undeclared" packages
93          */
94         const STACKER_NAME_UNDECLARED = 'package_undeclared';
95
96         /**
97          * Stacker name for "declared" packages (which are ready to send out)
98          */
99         const STACKER_NAME_DECLARED = 'package_declared';
100
101         /**
102          * Stacker name for "out-going" packages
103          */
104         const STACKER_NAME_OUTGOING = 'package_outgoing';
105
106         /**
107          * Stacker name for "incoming" decoded raw data
108          */
109         const STACKER_NAME_DECODED_INCOMING = 'package_decoded_data';
110
111         /**
112          * Stacker name for handled decoded raw data
113          */
114         const STACKER_NAME_DECODED_HANDLED = 'package_handled_decoded';
115
116         /**
117          * Stacker name for "back-buffered" packages
118          */
119         const STACKER_NAME_BACK_BUFFER = 'package_backbuffer';
120
121         /**
122          * Network target (alias): 'upper hubs'
123          */
124         const NETWORK_TARGET_UPPER_HUBS = 'upper';
125
126         /**
127          * Network target (alias): 'self'
128          */
129         const NETWORK_TARGET_SELF = 'self';
130
131         /**
132          * TCP package size in bytes
133          */
134         const TCP_PACKAGE_SIZE = 512;
135
136         /**
137          * Protected constructor
138          *
139          * @return      void
140          */
141         protected function __construct () {
142                 // Call parent constructor
143                 parent::__construct(__CLASS__);
144         }
145
146         /**
147          * Creates an instance of this class
148          *
149          * @param       $compressorInstance             A Compressor instance for compressing the content
150          * @return      $packageInstance                An instance of a Deliverable class
151          */
152         public static final function createNetworkPackage (Compressor $compressorInstance) {
153                 // Get new instance
154                 $packageInstance = new NetworkPackage();
155
156                 // Now set the compressor instance
157                 $packageInstance->setCompressorInstance($compressorInstance);
158
159                 /*
160                  * We need to initialize a stack here for our packages even for those
161                  * which have no recipient address and stamp... ;-) This stacker will
162                  * also be used for incoming raw data to handle it.
163                  */
164                 $stackerInstance = ObjectFactory::createObjectByConfiguredName('network_package_stacker_class');
165
166                 // At last, set it in this class
167                 $packageInstance->setStackerInstance($stackerInstance);
168
169                 // Init all stacker
170                 $packageInstance->initStackers();
171
172                 // Get a visitor instance for speeding up things
173                 $visitorInstance = ObjectFactory::createObjectByConfiguredName('node_raw_data_monitor_visitor_class', array($packageInstance));
174
175                 // Set it in this package
176                 $packageInstance->setVisitorInstance($visitorInstance);
177
178                 // Return the prepared instance
179                 return $packageInstance;
180         }
181
182         /**
183          * Initialize all stackers
184          *
185          * @return      void
186          */
187         protected function initStackers () {
188                 // Initialize all
189                 foreach (
190                         array(
191                                 self::STACKER_NAME_UNDECLARED,
192                                 self::STACKER_NAME_DECLARED,
193                                 self::STACKER_NAME_OUTGOING,
194                                 self::STACKER_NAME_DECODED_INCOMING,
195                                 self::STACKER_NAME_DECODED_HANDLED,
196                                 self::STACKER_NAME_BACK_BUFFER
197                         ) as $stackerName) {
198                         // Init this stacker
199                         $this->getStackerInstance()->initStacker($stackerName);
200                 } // END - foreach
201         }
202
203         /**
204          * "Getter" for hash from given content and helper instance
205          *
206          * @param       $content        Raw package content
207          * @param       $helperInstance         An instance of a HelpableHub class
208          * @param       $nodeInstance           An instance of a NodeHelper class
209          * @return      $hash   Hash for given package content
210          * @todo        $helperInstance is unused
211          */
212         private function getHashFromContent ($content, HelpableHub $helperInstance, NodeHelper $nodeInstance) {
213                 // Create the hash
214                 // @TODO crc32() is not very strong, but it needs to be fast
215                 $hash = crc32(
216                         $content .
217                         self::PACKAGE_CHECKSUM_SEPERATOR .
218                         $nodeInstance->getSessionId() .
219                         self::PACKAGE_CHECKSUM_SEPERATOR .
220                         $this->getCompressorInstance()->getCompressorExtension()
221                 );
222
223                 // And return it
224                 return $hash;
225         }
226
227         ///////////////////////////////////////////////////////////////////////////
228         //                   Delivering packages / raw data
229         ///////////////////////////////////////////////////////////////////////////
230
231         /**
232          * Delivers the given raw package data.
233          *
234          * @param       $packageData    Raw package data in an array
235          * @return      void
236          */
237         private function declareRawPackageData (array $packageData) {
238                 /*
239                  * We need to disover every recipient, just in case we have a
240                  * multi-recipient entry like 'upper' is. 'all' may be a not so good
241                  * target because it causes an overload on the network and may be
242                  * abused for attacking the network with large packages.
243                  */
244                 $discoveryInstance = PackageDiscoveryFactory::createPackageDiscoveryInstance();
245
246                 // Discover all recipients, this may throw an exception
247                 $discoveryInstance->discoverRecipients($packageData);
248
249                 // Now get an iterator
250                 $iteratorInstance = $discoveryInstance->getIterator();
251
252                 // ... and begin iteration
253                 while ($iteratorInstance->valid()) {
254                         // Get current entry
255                         $currentRecipient = $iteratorInstance->current();
256
257                         // Debug message
258                         $this->debugOutput('PACKAGE: Package declared for recipient ' . $currentRecipient);
259
260                         // Set the recipient
261                         $packageData[self::PACKAGE_DATA_RECIPIENT] = $currentRecipient;
262
263                         // And enqueue it to the writer class
264                         $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECLARED, $packageData);
265
266                         // Skip to next entry
267                         $iteratorInstance->next();
268                 } // END - while
269
270                 // Clean-up the list
271                 $discoveryInstance->clearRecipients();
272         }
273
274         /**
275          * Delivers raw package data. In short, this will discover the raw socket
276          * resource through a discovery class (which will analyse the receipient of
277          * the package), register the socket with the connection (handler/helper?)
278          * instance and finally push the raw data on our outgoing queue.
279          *
280          * @param       $packageData    Raw package data in an array
281          * @return      void
282          */
283         private function deliverRawPackageData (array $packageData) {
284                 /*
285                  * This package may become big, depending on the shared object size or
286                  * delivered message size which shouldn't be so long (to save
287                  * bandwidth). Because of the nature of the used protocol (TCP) we need
288                  * to split it up into smaller pieces to fit it into a TCP frame.
289                  *
290                  * So first we need (again) a discovery class but now a protocol
291                  * discovery to choose the right socket resource. The discovery class
292                  * should take a look at the raw package data itself and then decide
293                  * which (configurable!) protocol should be used for that type of
294                  * package.
295                  */
296                 $discoveryInstance = SocketDiscoveryFactory::createSocketDiscoveryInstance();
297
298                 // Now discover the right protocol
299                 $socketResource = $discoveryInstance->discoverSocket($packageData);
300
301                 // Get the listener from registry
302                 $helperInstance = Registry::getRegistry()->getInstance('connection');
303
304                 // We have to put this socket in our registry, so get an instance
305                 $registryInstance = SocketRegistry::createSocketRegistry();
306
307                 // Is it not there?
308                 if ((is_resource($socketResource)) && (!$registryInstance->isSocketRegistered($helperInstance, $socketResource))) {
309                         // Then register it
310                         $registryInstance->registerSocket($helperInstance, $socketResource, $packageData);
311                 } // END - if
312
313                 // Make sure the connection is up
314                 $helperInstance->getStateInstance()->validatePeerStateConnected();
315
316                 // We enqueue it again, but now in the out-going queue
317                 $this->getStackerInstance()->pushNamed(self::STACKER_NAME_OUTGOING, $packageData);
318         }
319
320         /**
321          * Sends waiting packages
322          *
323          * @param       $packageData    Raw package data
324          * @return      void
325          */
326         private function sendOutgoingRawPackageData (array $packageData) {
327                 // Init sent bytes
328                 $sentBytes = 0;
329
330                 // Get the right connection instance
331                 $helperInstance = SocketRegistry::createSocketRegistry()->getHandlerInstanceFromPackageData($packageData);
332
333                 // Is this connection still alive?
334                 if ($helperInstance->isShuttedDown()) {
335                         // This connection is shutting down
336                         // @TODO We may want to do somthing more here?
337                         return;
338                 } // END - if
339
340                 // Sent out package data
341                 $sentBytes = $helperInstance->sendRawPackageData($packageData);
342
343                 // Remember unsent raw bytes in back-buffer, if any
344                 $this->storeUnsentBytesInBackBuffer($packageData, $sentBytes);
345         }
346
347         /**
348          * "Enqueues" raw content into this delivery class by reading the raw content
349          * from given template instance and pushing it on the 'undeclared' stack.
350          *
351          * @param       $helperInstance         An instance of a HelpableHub class
352          * @param       $nodeInstance           An instance of a NodeHelper class
353          * @return      void
354          */
355         public function enqueueRawDataFromTemplate (HelpableHub $helperInstance, NodeHelper $nodeInstance) {
356                 // Get the raw content ...
357                 $content = $helperInstance->getTemplateInstance()->getRawTemplateData();
358
359                 // ... and compress it
360                 $content = $this->getCompressorInstance()->compressStream($content);
361
362                 // Add magic in front of it and hash behind it, including BASE64 encoding
363                 $content = sprintf(self::PACKAGE_MASK,
364                         // 1.) Compressor's extension
365                         $this->getCompressorInstance()->getCompressorExtension(),
366                         // 2.) Raw package content, encoded with BASE64
367                         base64_encode($content),
368                         // 3.) Tags
369                         implode(self::PACKAGE_TAGS_SEPERATOR, $helperInstance->getPackageTags()),
370                         // 4.) Checksum
371                         $this->getHashFromContent($content, $helperInstance, $nodeInstance)
372                 );
373
374                 // Now prepare the temporary array and push it on the 'undeclared' stack
375                 $this->getStackerInstance()->pushNamed(self::STACKER_NAME_UNDECLARED, array(
376                         self::PACKAGE_DATA_SENDER    => $nodeInstance->getSessionId(),
377                         self::PACKAGE_DATA_RECIPIENT => $helperInstance->getRecipientType(),
378                         self::PACKAGE_DATA_CONTENT   => $content,
379                 ));
380         }
381
382         /**
383          * Checks wether a package has been enqueued for delivery.
384          *
385          * @return      $isEnqueued             Wether a package is enqueued
386          */
387         public function isPackageEnqueued () {
388                 // Check wether the stacker is not empty
389                 $isEnqueued = (($this->getStackerInstance()->isStackInitialized(self::STACKER_NAME_UNDECLARED)) && (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_UNDECLARED)));
390
391                 // Return the result
392                 return $isEnqueued;
393         }
394
395         /**
396          * Checks wether a package has been declared
397          *
398          * @return      $isDeclared             Wether a package is declared
399          */
400         public function isPackageDeclared () {
401                 // Check wether the stacker is not empty
402                 $isDeclared = (($this->getStackerInstance()->isStackInitialized(self::STACKER_NAME_DECLARED)) && (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECLARED)));
403
404                 // Return the result
405                 return $isDeclared;
406         }
407
408         /**
409          * Checks wether a package should be sent out
410          *
411          * @return      $isWaitingDelivery      Wether a package is waiting for delivery
412          */
413         public function isPackageWaitingForDelivery () {
414                 // Check wether the stacker is not empty
415                 $isWaitingDelivery = (($this->getStackerInstance()->isStackInitialized(self::STACKER_NAME_OUTGOING)) && (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_OUTGOING)));
416
417                 // Return the result
418                 return $isWaitingDelivery;
419         }
420
421         /**
422          * Delivers an enqueued package to the stated destination. If a non-session
423          * id is provided, recipient resolver is being asked (and instanced once).
424          * This allows that a single package is being delivered to multiple targets
425          * without enqueueing it for every target. If no target is provided or it
426          * can't be determined a NoTargetException is being thrown.
427          *
428          * @return      void
429          * @throws      NoTargetException       If no target can't be determined
430          */
431         public function declareEnqueuedPackage () {
432                 // Make sure this method isn't working if there is no package enqueued
433                 if (!$this->isPackageEnqueued()) {
434                         // This is not fatal but should be avoided
435                         // @TODO Add some logging here
436                         return;
437                 } // END - if
438
439                 // Now we know for sure there are packages to deliver, we can start
440                 // with the first one.
441                 $packageData = $this->getStackerInstance()->getNamed(self::STACKER_NAME_UNDECLARED);
442
443                 // Declare the raw package data for delivery
444                 $this->declareRawPackageData($packageData);
445
446                 // And remove it finally
447                 $this->getStackerInstance()->popNamed(self::STACKER_NAME_UNDECLARED);
448         }
449
450         /**
451          * Delivers the next declared package. Only one package per time will be sent
452          * because this may take time and slows down the whole delivery
453          * infrastructure.
454          *
455          * @return      void
456          */
457         public function deliverDeclaredPackage () {
458                 // Sanity check if we have packages declared
459                 if (!$this->isPackageDeclared()) {
460                         // This is not fatal but should be avoided
461                         // @TODO Add some logging here
462                         return;
463                 } // END - if
464
465                 // Get the package again
466                 $packageData = $this->getStackerInstance()->getNamed(self::STACKER_NAME_DECLARED);
467
468                 try {
469                         // And try to send it
470                         $this->deliverRawPackageData($packageData);
471
472                         // And remove it finally
473                         $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECLARED);
474                 } catch (InvalidStateException $e) {
475                         // The state is not excepected (shall be 'connected')
476                         $this->debugOutput('PACKAGE: Unexpected state detected. message=' . $e->getMessage());
477                 }
478         }
479
480         /**
481          * Sends waiting packages out for delivery
482          *
483          * @return      void
484          */
485         public function sendWaitingPackage () {
486                 // Send any waiting bytes in the back-buffer before sending a new package
487                 $this->sendBackBufferBytes();
488
489                 // Sanity check if we have packages waiting for delivery
490                 if (!$this->isPackageWaitingForDelivery()) {
491                         // This is not fatal but should be avoided
492                         $this->debugOutput('PACKAGE: No package is waiting for delivery, but ' . __METHOD__ . ' was called.');
493                         return;
494                 } // END - if
495
496                 // Get the package again
497                 $packageData = $this->getStackerInstance()->getNamed(self::STACKER_NAME_OUTGOING);
498
499                 try {
500                         // Now try to send it
501                         $this->sendOutgoingRawPackageData($packageData);
502
503                         // And remove it finally
504                         $this->getStackerInstance()->popNamed(self::STACKER_NAME_OUTGOING);
505                 } catch (InvalidSocketException $e) {
506                         // Output exception message
507                         $this->debugOutput('PACKAGE: Package was not delivered: ' . $e->getMessage());
508                 }
509         }
510
511         ///////////////////////////////////////////////////////////////////////////
512         //                   Receiving packages / raw data
513         ///////////////////////////////////////////////////////////////////////////
514
515         /**
516          * Checks wether decoded raw data is pending
517          *
518          * @return      $isPending      Wether decoded raw data is pending
519          */
520         private function isDecodedDataPending () {
521                 // Just return wether the stack is not empty
522                 $isPending = (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECODED_INCOMING));
523
524                 // Return the status
525                 return $isPending;
526         }
527
528         /**
529          * Checks wether new raw package data has arrived at a socket
530          *
531          * @param       $poolInstance   An instance of a PoolableListener class
532          * @return      $hasArrived             Wether new raw package data has arrived for processing
533          */
534         public function isNewRawDataPending (PoolableListener $poolInstance) {
535                 // Visit the pool. This monitors the pool for incoming raw data.
536                 $poolInstance->accept($this->getVisitorInstance());
537
538                 // Check for new data arrival
539                 $hasArrived = $this->isDecodedDataPending();
540
541                 // Return the status
542                 return $hasArrived;
543         }
544
545         /**
546          * Handles the incoming decoded raw data. This method does not "convert" the
547          * decoded data back into a package array, it just "handles" it and pushs it
548          * on the next stack.
549          *
550          * @return      void
551          */
552         public function handleIncomingDecodedData () {
553                 /*
554                  * This method should only be called if decoded raw data is pending,
555                  * so check it again.
556                  */
557                 if (!$this->isDecodedDataPending()) {
558                         // This is not fatal but should be avoided
559                         // @TODO Add some logging here
560                         return;
561                 } // END - if
562
563                 // Very noisy debug message:
564                 /* NOISY-DEBUG: */ $this->debugOutput('PACKAGE: Stacker size is ' . $this->getStackerInstance()->getStackCount(self::STACKER_NAME_DECODED_INCOMING) . ' entries.');
565
566                 // "Pop" the next entry (the same array again) from the stack
567                 $decodedData = $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECODED_INCOMING);
568
569                 // Make sure both array elements are there
570                 assert((is_array($decodedData)) && (isset($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA])) && (isset($decodedData[BaseRawDataHandler::PACKAGE_ERROR_CODE])));
571
572                 /*
573                  * Also make sure the error code is SOCKET_ERROR_UNHANDLED because we
574                  * only want to handle unhandled packages here.
575                  */
576                 assert($decodedData[BaseRawDataHandler::PACKAGE_ERROR_CODE] == BaseRawDataHandler::SOCKET_ERROR_UNHANDLED);
577
578                 // Remove the last chunk seperator (because it is being added and we don't need it)
579                 if (substr($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA], -1, 1) == PackageFragmenter::CHUNK_SEPERATOR) {
580                         // It is there and should be removed
581                         $decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA] = substr($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA], 0, -1);
582                 } // END - if
583
584                 // This package is "handled" and can be pushed on the next stack
585                 $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECODED_HANDLED, $decodedData);
586         }
587
588         /**
589          * Adds raw decoded data from the given handler instance to this receiver
590          *
591          * @param       $handlerInstance        An instance of a Networkable class
592          * @return      void
593          */
594         public function addDecodedDataToIncomingStack (Networkable $handlerInstance) {
595                 /*
596                  * Get the decoded data from the handler, this is an array with
597                  * 'decoded_data' and 'error_code' as elements.
598                  */
599                 $decodedData = $handlerInstance->getNextDecodedData();
600
601                 // Very noisy debug message:
602                 //* NOISY-DEBUG: */ $this->debugOutput('PACKAGE: decodedData[' . gettype($decodedData) . ']=' . print_r($decodedData, true));
603
604                 // And push it on our stack
605                 $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECODED_INCOMING, $decodedData);
606         }
607
608         /**
609          * Checks wether incoming decoded data is handled.
610          *
611          * @return      $isHandled      Wether incoming decoded data is handled
612          */
613         public function isIncomingDecodedDataHandled () {
614                 // Determine if the stack is not empty
615                 $isHandled = (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECODED_HANDLED));
616
617                 // Return it
618                 return $isHandled;
619         }
620
621         /**
622          * Assembles incoming decoded data so it will become an abstract network
623          * package again.
624          *
625          * @return      void
626          */
627         public function assembleDecodedDataToPackage () {
628                 $this->partialStub('Please implement this method.');
629         }
630
631         /**
632          * Checks wether a new package has arrived
633          *
634          * @return      $hasArrived             Wether a new package has arrived for processing
635          */
636         public function isNewPackageArrived () {
637                 // @TODO Add some content here
638         }
639 }
640
641 // [EOF]
642 ?>