Strumenti Utente

Strumenti Sito


magistraleinformaticanetworking:spm:farmposix

Questa è una vecchia versione del documento!


Task farm (POSIX TCP/IP)

Version 1

This version uses two servers (emitter and collector). Emitter accepts requests from workers and delivers tasks to be executed. Collector accepts results from workers and print them out on the screen. Workers open a separate connection with both emitter and collector for each task. The two connections are closed after processing the task and delivering the result. Tasks and results are (positive) integers. Emitter and worker termination is handled correctly. Collector does not terminate (suggestion: each worker should propagate EOS to collector. Collector should count EOS and terminate after receiving EOS from all the workers).

Emitter code

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <netdb.h>
#include <arpa/inet.h>

//  on demand scheduling of tasks to workers
//  receive request from worker i -> send task to compute to worker i
//  when no more tasks are available, send an EOS termination 
// 
//  usage is: 
//     a.out portno 
//           number of the port used to get worker task requests

#define MAXHOSTNAME 80

int main(int argc, char * argv[]) {
        int s,si, retcode, i;
        unsigned int salen; 
        struct sockaddr_in sa,sai;
        char hostname[MAXHOSTNAME];

        int task = 0;           // tasks are positive integeres, in this case
        int tasklimit = 100;    // we'll send tasklimit tasks before stopping
        int eos = -1;           // special task to denote End Of Stream

        // code needed to set up the communication infrastructure
        printf("Declaring socket\n");
        si = socket(AF_INET,SOCK_STREAM,0);     // socket for inputs
        if(si == -1) {perror("opening socket for input"); return -1;}
        sai.sin_family = AF_INET;
        sai.sin_port = htons(atoi(argv[1]));
        gethostname(hostname,MAXHOSTNAME);
        memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), 
                sizeof(sa.sin_addr));
        printf("Binding to %s\n",inet_ntoa(sai.sin_addr));
        retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai));
        if(retcode == -1) {
                perror("while calling bind"); return -1; }
        printf("Listening socket\n");
        retcode = listen(si,1); 
        if(retcode == -1) { 
                perror("while calling listen"); return -1; }

        while(1==1) {
          salen = sizeof(sa);
          printf("Accepting connections .... \n");
          s = accept(si,(struct sockaddr *)&sa,&salen);  // accept a connection
          if(s == 1) {
                  perror("while calling an accept"); return -1; }
          retcode = read(s,&i,sizeof(int));  // read request from worker 
          if(retcode == -1) {
                  perror("while reading task from worker"); return -1; }
          printf("%d ",i); fflush(stdout);
          if(task < tasklimit) {  // send a task to the requesting worker
                  write(s,&task,sizeof(task));
                  printf("sent task %d to worker %d\n",task,i);
                  task++;       // next task to be sent 
          } else { // if no more tasks, then send an EOS
                  write(s,&eos,sizeof(task));
                  printf("Send EOS to worker %d\n",i);
          }
          close(s);
        }
        printf("Closing operations\n");
        return 0; 
}

Worker code

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <netdb.h>
#include <unistd.h>

/* 
 *   function to be computed on the single task
 *   this is actually the only "business logic" code
 */

int f(int task) {
        sleep(1);  // just to simulate some work ... 
        return (task*task);
}

/* 
 *   receives integer tasks and delivers integer results, up to the reception
 *   of an EOS (a -1) 
 *   this version opens/closes the sockets at each task, which is not 
 *   definitely efficient ... 
 * 
 *   usage: 
 *      a.out emitter_address emitter_port collector_address collector_port
 */

int main(int argc, char * argv[]) {
        int s, retcode;
        struct sockaddr_in sa;
        int pid = 0; 
        
        
        pid = getpid();    // get my POSIX process id 
        printf("This is worker %d\n",pid);
        while(1==1) {
                int task, result; 
                
                s = socket(AF_INET,SOCK_STREAM,0);   // socket to access emitter
                if(s == -1) { 
                        perror("while creating the socket to emitter"); 
                        return (-1); }
                sa.sin_family = AF_INET;
                sa.sin_port = htons(atoi(argv[2]));
                memcpy(&sa.sin_addr, (gethostbyname(argv[1])->h_addr), 
                        sizeof(sa.sin_addr));
                retcode = connect(s,(struct sockaddr *)&sa,sizeof(sa));
                if(retcode == -1) {
                        perror("while connecting to emitter"); return(-1);}
                
                write(s,&pid,sizeof(pid)); // sends a request to the emitter 
                retcode = read(s,&task,sizeof(task)); // get a task
                if(retcode != sizeof(task)) {
                        perror("while receiving a task"); return -1; }
                
                // check for EOS
                if(task < 0)  {
                        printf("Received EOS\n");
                        break;  // if EOS terminate loop iteratons
                } else {        
                        printf("Received task %d\n",task);
                        // otherwise process the incoming task
                        result = f(task); 
                        printf("Computed %d -> %d\n",task,result);
                
                        // send result to the collector
                        s = socket(AF_INET,SOCK_STREAM,0); // create socket to collector
                        sa.sin_family = AF_INET;
                        sa.sin_port = htons(atoi(argv[4]));
                        memcpy(&sa.sin_addr, (gethostbyname(argv[3])->h_addr), 
                                sizeof(sa.sin_addr));
                        retcode = connect(s,(struct sockaddr *)&sa,sizeof(sa));
                        if(retcode == -1) {
                                perror("connecting to the collector"); 
                                return(-1);}
                        
                        // send the result and close connection
                        write(s,&result,sizeof(result));
                        close(s);
                
                        // then cycle again
                }
        }
        close(s);
        return 0; 
}

Collector code

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <netdb.h>

// receives results from the workers and displays them on the console 
// usage: 
//    a.out portno
//          number of the port for the result socket 

#define MAXHOSTNAME 80  

int main(int argc, char * argv[]) {
        int s,si, retcode, i;
        unsigned int salen; 
        struct sockaddr_in sa,sai;
        char hostname[MAXHOSTNAME];

        si = socket(AF_INET,SOCK_STREAM,0);     // socket to receive the results
        if(si == -1) {
                perror("while opening socket"); return -1;}
        sai.sin_family = AF_INET;
        sai.sin_port = htons(atoi(argv[1]));
        gethostname(hostname,MAXHOSTNAME);
        memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), 
                sizeof(sa.sin_addr));
        retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai));
        if(retcode == -1) { 
                perror("while binding socket to addr"); return -1; }
        retcode = listen(si,1); 
        if(retcode == -1) { 
                perror("while calling listen"); return -1; }

        while(1==1) {
          salen = sizeof(sa);
          printf("Accepting connections\n");
          s = accept(si,(struct sockaddr *)&sa,&salen); // accept a connection
          if(s == 1) {
                  perror("while accepting a connection"); return -1;
          }
          retcode = read(s,&i,sizeof(int));// read a res from one of the Worker
          if(retcode == -1) {
                perror("while reading a result"); return -1; }
          if(i==(-1)) { 
                printf("EOS -> terminating\n");
                break;
          }
          printf("Read result: %d ",i); fflush(stdout); // and print it on console
          close(s);
        }
        close(si);
        return 0; 
}
magistraleinformaticanetworking/spm/farmposix.1300815059.txt.gz · Ultima modifica: 22/03/2011 alle 17:30 (14 anni fa) da Marco Danelutto

Donate Powered by PHP Valid HTML5 Valid CSS Driven by DokuWiki