From e2cd1af1eb84e8634964780ffc06af684a372b2f Mon Sep 17 00:00:00 2001 From: curt Date: Mon, 17 May 1999 23:32:23 +0000 Subject: [PATCH] More development ... --- Tools/Construct/Parallel/client.cxx | 40 +++++++++++++++++------------ Tools/Construct/Parallel/server.cxx | 28 +++++++++++++++----- 2 files changed, 46 insertions(+), 22 deletions(-) diff --git a/Tools/Construct/Parallel/client.cxx b/Tools/Construct/Parallel/client.cxx index da429673d..97e320d14 100644 --- a/Tools/Construct/Parallel/client.cxx +++ b/Tools/Construct/Parallel/client.cxx @@ -53,7 +53,7 @@ int make_socket (char *host, unsigned short int port) { // connect to the server and get the next task -long int get_next_task( const string& host, int port ) { +long int get_next_task( const string& host, int port, long int last_tile ) { long int tile; int sock, len; fd_set ready; @@ -65,29 +65,29 @@ long int get_next_task( const string& host, int port ) { } // build a command string from the argv[]'s - strcpy(message, "hello world!\n"); + sprintf(message, "%ld", last_tile); // send command and arguments to remote server - // if ( write(sock, message, sizeof(message)) < 0 ) { - // perror("Cannot write to stream socket"); - // } + if ( write(sock, message, sizeof(message)) < 0 ) { + perror("Cannot write to stream socket"); + } // loop until remote program finishes - cout << "looping ..." << endl; + cout << "querying server for next task ..." << endl; FD_ZERO(&ready); FD_SET(sock, &ready); // block until input from sock select(32, &ready, 0, 0, NULL); - cout << "unblocking ... " << endl; + cout << " received reply" << endl; if ( FD_ISSET(sock, &ready) ) { /* input coming from socket */ if ( (len = read(sock, message, 1024)) > 0 ) { message[len] = '\0'; tile = atoi(message); - cout << "tile to construct = " << tile << endl; + cout << " tile to construct = " << tile << endl; close(sock); return tile; } else { @@ -101,13 +101,16 @@ long int get_next_task( const string& host, int port ) { } -// build the specified tile -void run_task( long int tile ) { +// build the specified tile, return true if contruction completed +// successfully +bool construct_tile( long int tile ) { + return true; } main(int argc, char *argv[]) { - long int tile; + long int tile, last_tile; + bool result; // Check usage if ( argc < 3 ) { @@ -118,10 +121,15 @@ main(int argc, char *argv[]) { string host = argv[1]; int port = atoi( argv[2] ); - while ( - (tile = get_next_task( host, port )) >= 0 - ) - { - run_task( tile ); + last_tile = 0; + + while ( (tile = get_next_task( host, port, last_tile )) >= 0 ) { + result = construct_tile( tile ); + + if ( result ) { + last_tile = tile; + } else { + last_tile = -tile; + } } } diff --git a/Tools/Construct/Parallel/server.cxx b/Tools/Construct/Parallel/server.cxx index 05f74ee51..f8dac55c3 100644 --- a/Tools/Construct/Parallel/server.cxx +++ b/Tools/Construct/Parallel/server.cxx @@ -59,7 +59,6 @@ int make_socket (unsigned short int* port) { } -#if 0 // better move this to client // return true if file exists static bool file_exists( const string& file ) { struct stat buf; @@ -87,7 +86,7 @@ static bool has_data( const string& path, const FGBucket& b ) { return false; } -#endif + // initialize the tile counting system void init_tile_count() { @@ -131,7 +130,8 @@ long int get_next_tile( const string& work_base, const string& output_base ) } // reset lat - lat = -89.0 + (shift_up*dy) - (dy*0.5); + // lat = -89.0 + (shift_up*dy) - (dy*0.5); + lat = 0.0 + (shift_up*dy) + (dy*0.5); // reset lon FGBucket tmp( 0.0, lat ); @@ -216,7 +216,11 @@ int main( int argc, char **argv ) { // printf("%d %d Incomming message --> ", getpid(), pid); // get the next tile to work on - next_tile = get_next_tile(work_base, output_base); + next_tile = get_next_tile( work_base, output_base ); + while ( ! has_data( work_base, FGBucket(next_tile) ) ) { + next_tile = get_next_tile( work_base, output_base ); + } + // cout << "next tile = " << next_tile << endl;; msgsock = accept(sock, 0, 0); @@ -236,7 +240,8 @@ int main( int argc, char **argv ) { // clean up all of our zombie children int status; while ( (pid = waitpid( WAIT_ANY, &status, WNOHANG )) > 0 ) { - // cout << "waitpid() returned " << pid << endl; + cout << "waitpid(): pid = " << pid + << " status = " << status << endl; } } else { // This is the child @@ -244,6 +249,17 @@ int main( int argc, char **argv ) { // cout << "new process started to handle new connection for " // << next_tile << endl; + // Read client's command + char buf[MAXBUF]; + if ( (length = read(msgsock, buf, MAXBUF)) < 0) { + perror("Cannot read command"); + exit(-1); + } + + buf[length] = '\0'; + long int returned_tile = atoi(buf); + cout << "client replied with " << returned_tile << endl; + // reply to the client char message[MAXBUF]; sprintf(message, "%ld", next_tile); @@ -254,7 +270,7 @@ int main( int argc, char **argv ) { close(msgsock); // cout << "process for " << next_tile << " ended" << endl; - exit(0); + exit(returned_tile); } } } -- 2.39.5