Strumenti Utente

Strumenti Sito


magistraleinformaticanetworking:spm:farmposix

Task farm (POSIX TCP/IP)

(The code presented here does not pretend to be efficient, nor completely correct. It has been designed with the aim of illustrating several of the problems related to the implementation of fairly simple parallel applications)

Version 1

This version uses two servers (emitter and collector). Emitter accepts requests from workers and delivers tasks to be executed. Collector accepts results from workers and print them out on the screen. Workers open a separate connection with both emitter and collector for each task. The two connections are closed after processing the task and delivering the result. Tasks and results are (positive) integers. Emitter and worker termination is handled correctly. Collector does not terminate (suggestion: each worker should propagate EOS to collector. Collector should count EOS and terminate after receiving EOS from all the workers).

Emitter code

emitter.c
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <netdb.h>
#include <arpa/inet.h>
 
//  on demand scheduling of tasks to workers
//  receive request from worker i -> send task to compute to worker i
//  when no more tasks are available, send an EOS termination 
// 
//  usage is: 
//     a.out portno 
//           number of the port used to get worker task requests
 
#define MAXHOSTNAME 80
 
int main(int argc, char * argv[]) {
        int s,si, retcode, i;
        unsigned int salen; 
        struct sockaddr_in sa,sai;
        char hostname[MAXHOSTNAME];
 
        int task = 0;           // tasks are positive integeres, in this case
        int tasklimit = 100;    // we'll send tasklimit tasks before stopping
        int eos = -1;           // special task to denote End Of Stream
 
        // code needed to set up the communication infrastructure
        printf("Declaring socket\n");
        si = socket(AF_INET,SOCK_STREAM,0);     // socket for inputs
        if(si == -1) {perror("opening socket for input"); return -1;}
        sai.sin_family = AF_INET;
        sai.sin_port = htons(atoi(argv[1]));
        gethostname(hostname,MAXHOSTNAME);
        memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), 
                sizeof(sa.sin_addr));
        printf("Binding to %s\n",inet_ntoa(sai.sin_addr));
        retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai));
        if(retcode == -1) {
                perror("while calling bind"); return -1; }
        printf("Listening socket\n");
        retcode = listen(si,1); 
        if(retcode == -1) { 
                perror("while calling listen"); return -1; }
 
        while(1==1) {
          salen = sizeof(sa);
          printf("Accepting connections .... \n");
          s = accept(si,(struct sockaddr *)&sa,&salen);  // accept a connection
          if(s == 1) {
                  perror("while calling an accept"); return -1; }
          retcode = read(s,&i,sizeof(int));  // read request from worker 
          if(retcode == -1) {
                  perror("while reading task from worker"); return -1; }
          printf("%d ",i); fflush(stdout);
          if(task < tasklimit) {  // send a task to the requesting worker
                  write(s,&task,sizeof(task));
                  printf("sent task %d to worker %d\n",task,i);
                  task++;       // next task to be sent 
          } else { // if no more tasks, then send an EOS
                  write(s,&eos,sizeof(task));
                  printf("Send EOS to worker %d\n",i);
          }
          close(s);
        }
        printf("Closing operations\n");
        return 0; 
}

Worker code

worker.c
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <netdb.h>
#include <unistd.h>
 
/* 
 *   function to be computed on the single task
 *   this is actually the only "business logic" code
 */
 
int f(int task) {
        sleep(1);  // just to simulate some work ... 
        return (task*task);
}
 
/* 
 *   receives integer tasks and delivers integer results, up to the reception
 *   of an EOS (a -1) 
 *   this version opens/closes the sockets at each task, which is not 
 *   definitely efficient ... 
 * 
 *   usage: 
 *      a.out emitter_address emitter_port collector_address collector_port
 */
 
int main(int argc, char * argv[]) {
        int s, retcode;
        struct sockaddr_in sa;
        int pid = 0; 
 
 
        pid = getpid();    // get my POSIX process id 
        printf("This is worker %d\n",pid);
        while(1==1) {
                int task, result; 
 
                s = socket(AF_INET,SOCK_STREAM,0);   // socket to access emitter
                if(s == -1) { 
                        perror("while creating the socket to emitter"); 
                        return (-1); }
                sa.sin_family = AF_INET;
                sa.sin_port = htons(atoi(argv[2]));
                memcpy(&sa.sin_addr, (gethostbyname(argv[1])->h_addr), 
                        sizeof(sa.sin_addr));
                retcode = connect(s,(struct sockaddr *)&sa,sizeof(sa));
                if(retcode == -1) {
                        perror("while connecting to emitter"); return(-1);}
 
                write(s,&pid,sizeof(pid)); // sends a request to the emitter 
                retcode = read(s,&task,sizeof(task)); // get a task
                if(retcode != sizeof(task)) {
                        perror("while receiving a task"); return -1; }
 
                // check for EOS
                if(task < 0)  {
                        printf("Received EOS\n");
                        break;  // if EOS terminate loop iteratons
                } else {        
                        printf("Received task %d\n",task);
                        // otherwise process the incoming task
                        result = f(task); 
                        printf("Computed %d -> %d\n",task,result);
 
                        // send result to the collector
                        s = socket(AF_INET,SOCK_STREAM,0); // create socket to collector
                        sa.sin_family = AF_INET;
                        sa.sin_port = htons(atoi(argv[4]));
                        memcpy(&sa.sin_addr, (gethostbyname(argv[3])->h_addr), 
                                sizeof(sa.sin_addr));
                        retcode = connect(s,(struct sockaddr *)&sa,sizeof(sa));
                        if(retcode == -1) {
                                perror("connecting to the collector"); 
                                return(-1);}
 
                        // send the result and close connection
                        write(s,&result,sizeof(result));
                        close(s);
 
                        // then cycle again
                }
        }
        close(s);
        return 0; 
}

Collector code

collector.c
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <netdb.h>
 
// receives results from the workers and displays them on the console 
// usage: 
//    a.out portno
//          number of the port for the result socket 
 
#define MAXHOSTNAME 80  
 
int main(int argc, char * argv[]) {
        int s,si, retcode, i;
        unsigned int salen; 
        struct sockaddr_in sa,sai;
        char hostname[MAXHOSTNAME];
 
        si = socket(AF_INET,SOCK_STREAM,0);     // socket to receive the results
        if(si == -1) {
                perror("while opening socket"); return -1;}
        sai.sin_family = AF_INET;
        sai.sin_port = htons(atoi(argv[1]));
        gethostname(hostname,MAXHOSTNAME);
        memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), 
                sizeof(sa.sin_addr));
        retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai));
        if(retcode == -1) { 
                perror("while binding socket to addr"); return -1; }
        retcode = listen(si,1); 
        if(retcode == -1) { 
                perror("while calling listen"); return -1; }
 
        while(1==1) {
          salen = sizeof(sa);
          printf("Accepting connections\n");
          s = accept(si,(struct sockaddr *)&sa,&salen); // accept a connection
          if(s == 1) {
                  perror("while accepting a connection"); return -1;
          }
          retcode = read(s,&i,sizeof(int));// read a res from one of the Worker
          if(retcode == -1) {
                perror("while reading a result"); return -1; }
          if(i==(-1)) { 
                printf("EOS -> terminating\n");
                break;
          }
          printf("Read result: %d ",i); fflush(stdout); // and print it on console
          close(s);
        }
        close(si);
        return 0; 
}

Makefile

makefile
CC = gcc 
CFLAGS = 
TARGETS = emitter collector worker
 
all:    $(TARGETS)
 
clean:  
        rm -f $(TARGETS)
 
worker:         worker.c
        $(CC) -o worker $(CFLAGS) worker.c
 
collector:      collector.c
        $(CC) -o collector $(CFLAGS) collector.c
 
emitter:        emitter.c
        $(CC) -o emitter $(CFLAGS) emitter.c
 
dist:   $(SOURCES)
        rsync -avz ../FarmSocket ottavinareale:
        rsync -avz ../FarmSocket backus:Downloads/
        rsync -avz ../FarmSocket pianosau: 

Deployment code (Perl)

deploy.pl
#!/usr/bin/perl 
$sourcedir = "/home/marcod/Documents/Didattica/SPM/Dispensa/Codice/FarmSocket";
foreach $m (@ARGV) {
  print "Deploying to $m\n";
  system("rsync -avz $sourcedir $m:");
  print "Done!\n";
}
exit;

Deployment code (BASH)

deploy.bash
#!/bin/bash
for m in $@ 
do
  rsync -avz /home/marcod/Documents/Didattica/SPM/Dispensa/Codice/FarmSocket  $m:
done

Launcher code (Perl)

The launcher gets parameters from the command line (see comments) and saves the names of the machines used in the

machine.used

file. These machine names are then used by the

terminator.pl

program.

launcher.pl
#!/usr/bin/perl
 
### launcher emitter_machine emitter_port collector_machine collector_port w_machine*
 
$emachine = shift @ARGV;
$eport    = shift @ARGV;
$cmachine = shift @ARGV;
$cport    = shift @ARGV;
 
open FD, ">machines.used" or die "Cannot open log file";
system("ssh $emachine \"\( cd FarmSocket\; make emitter\; ./emitter $eport \)\" & ");
print FD "$emachine\n";
sleep(5);
system("ssh $cmachine \"\( cd FarmSocket\; make collector\; ./collector $cport \)\" & ");
print FD "$cmachine\n";
sleep(5);
 
foreach $w (@ARGV) {
  system("ssh $w \"\( cd FarmSocket\; make worker\; ./worker $emachine $eport $cmachine $cport \)\" & ");
  print FD "$w\n";
 
}
close FD;
exit;

Terminator code (Perl)

terminator.pl
#!/usr/bin/perl
open FD, "<machines.used" or die "Cannot open log file for reading"; 
while(<FD>) {
  $m = $_ ; 
  chop $m; 
  system("ssh $m kill -9 -1"); 
}
close FD; 
system("rm -f machines.used");
exit; 

Version 2

Workers permanently connect to emitter. Emitter detects worker shutdown via errors returned by read syscalls and signals (printf) that the worker died and a task needs to be considered for rescheduling. Emitter is multithreaded: each thread manages one worker. Therefore this version detects worker faults but does not manage them.

Emitter code

emitter.c
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <netdb.h>
#include <arpa/inet.h>
#include <pthread.h>
 
//  on demand scheduling of tasks to workers
//  receive request from worker i -> send task to compute to worker i
//  when no more tasks are available, send an EOS termination 
// 
//  usage is: 
//     a.out portno 
//           number of the port used to get worker task requests
// 
//  this is the version checking worker failure (emitter side) 
//  and just signaling a worker failure
// 
 
// data structure taking care of the tasks to compute. We need to push 
// back tasks that have not presumably been computed due to failures
// 
 
// to manage task list
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
// moved outside main: global 
int task = 0;           // tasks are positive integeres, in this case
int tasklimit = 100;    // we'll send tasklimit tasks before stopping
int eos = -1;           // special task to denote End Of Stream
 
void * worker_handler(void * vs) {
  int s = *((int *) vs); 
  int retcode,i; 
  int task_to_send; 
  int * taskNo = (int *) calloc(sizeof(int),1);         // host tasks computed
  // start reading tasks
  printf("worker_handler %ld started\n",pthread_self());
  while(1==1) {
    printf("Waiting for a request \n");
    retcode = read(s,&i,sizeof(int));  // read request from worker 
    if(retcode != sizeof(int)) {  // should be != 0 (read can be split)
      perror("while reading task from worker"); fflush(stderr); 
      printf("NEED TO RUN A SUBSTITUTE WORKER AFTER THIS DIED !!! \n"); 
      // you should know which is the last task sent, in such a way it can 
      // be rescheduled onto a different worker
      printf("TASK PRESUMABLY NOT COMPLETED is TASK <%d>\n",task_to_send);
      return (NULL); 
    }
    printf("%d ",i); fflush(stdout);
    pthread_mutex_lock(&mutex);
    if(task < tasklimit) {  // send a task to the requesting worker
          task++;       // next task to be sent 
          task_to_send = task; 
          taskNo++;
    } else { // if no more tasks, then send an EOS
          task_to_send = eos; 
    }
    pthread_mutex_unlock(&mutex); 
    write(s,&task_to_send,sizeof(task_to_send));  // send either task or eos
    if(task_to_send != eos) {
          printf("sent task %d to worker %d\n",task,i);
    } else {
          printf("Send EOS to worker %d\n",i);
          close(s);
          return((void *)taskNo);
    }
  }
}
 
#define MAXHOSTNAME 80
#define MAXTHREADS  16
 
int main(int argc, char * argv[]) {
        int s,si, retcode, i;
        unsigned int salen; 
        struct sockaddr_in sa,sai;
        char hostname[MAXHOSTNAME];
        pthread_t tids[MAXTHREADS];  // thread handlers
        int tNo = 0;                 // thread to allocate
 
 
        // code needed to set up the communication infrastructure
        printf("Declaring socket\n");
        si = socket(AF_INET,SOCK_STREAM,0);     // socket for inputs
        if(si == -1) {perror("opening socket for input"); return -1;}
        sai.sin_family = AF_INET;
        sai.sin_port = htons(atoi(argv[1]));
        gethostname(hostname,MAXHOSTNAME);
        memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), 
                sizeof(sa.sin_addr));
        printf("Binding to %s\n",inet_ntoa(sai.sin_addr));
        retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai));
        if(retcode == -1) {
                perror("while calling bind"); return -1; }
        printf("Listening socket\n");
        retcode = listen(si,1); 
        if(retcode == -1) { 
                perror("while calling listen"); return -1; }
 
        while(1==1) {
          int * s = (int *) calloc(sizeof(int),1); 
 
          salen = sizeof(sa);
          printf("Accepting connections .... \n");
          *s = accept(si,(struct sockaddr *)&sa,&salen);  // accept a connection
          if(*s == 1) {
                  perror("while calling an accept"); return -1; }
          // now fork a thread to permanently handle the connection
          pthread_create(&tids[tNo],NULL,worker_handler,(void *) s);      
          printf("Created worker_handle No %d\n",tNo);
          tNo++; 
        }
        printf("Closing operations\n");
        return 0; 
}

Worker code

worker.c
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <netdb.h>
#include <unistd.h>
 
/* 
 *   function to be computed on the single task
 *   this is actually the only "business logic" code
 */
 
int f(int task) {
        sleep(1);  // just to simulate some work ... 
        return (task*task);
}
 
/* 
 *   receives integer tasks and delivers integer results, up to the reception
 *   of an EOS (a -1) 
 *   this version opens/closes the sockets at each task, which is not 
 *   definitely efficient ... 
 * 
 *   usage: 
 *      a.out emitter_address emitter_port collector_address collector_port
 */
 
int main(int argc, char * argv[]) {
        int s, cs, retcode;     // task socket, result socket, error ret
        struct sockaddr_in sa;
        int pid = 0; 
 
 
        pid = getpid();    // get my POSIX process id 
        printf("This is worker %d\n",pid);
        // connect permanently to the emitter process 
        s = socket(AF_INET,SOCK_STREAM,0);   // socket to access emitter
        if(s == -1) { 
                perror("while creating the socket to emitter"); 
                return (-1); }
        printf("Opened E socket %d\n",s);
        sa.sin_family = AF_INET;
        sa.sin_port = htons(atoi(argv[2]));
        memcpy(&sa.sin_addr, (gethostbyname(argv[1])->h_addr), 
                sizeof(sa.sin_addr));
        retcode = connect(s,(struct sockaddr *)&sa,sizeof(sa));
        if(retcode == -1) {
                perror("while connecting to emitter"); return(-1);}
 
        while(1==1) {
                int task, result; 
                // sends a request to the emitter 
                printf("Writing to E socket %d\n",s);
                retcode = write(s,&pid,sizeof(pid)); 
                if(retcode == -1) { perror("sending a request"); return -1; }
                printf("Reading from E socket %d\n",s);
                retcode = read(s,&task,sizeof(task)); // get a task
                if(retcode != sizeof(task)) {
                        perror("while receiving a task"); return -1; }
                printf("Read from E socket %d\n",s);
 
                // check for EOS
                if(task < 0)  {
                        printf("Received EOS\n");
                        break;  // if EOS terminate loop iteratons
                } else {        
                        printf("Received task %d\n",task);
                        // otherwise process the incoming task
                        result = f(task); 
                        printf("Computed %d -> %d\n",task,result);
 
                        // send result to the collector
                        cs = socket(AF_INET,SOCK_STREAM,0); // create socket to collector
                        sa.sin_family = AF_INET;
                        sa.sin_port = htons(atoi(argv[4]));
                        memcpy(&sa.sin_addr, (gethostbyname(argv[3])->h_addr), 
                                sizeof(sa.sin_addr));
                        retcode = connect(cs,(struct sockaddr *)&sa,sizeof(sa));
                        if(retcode == -1) {
                                perror("connecting to the collector"); 
                                return(-1);}
 
                        // send the result and close connection
                        write(cs,&result,sizeof(result));
                        close(cs);
                        printf("Sent to C socket, E sock %d\n",s);
 
                        // then cycle again
                }
        }
        close(s);
        return 0; 
}

Collector code

<codec collector.c> #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netdb.h>

receives results from the workers and displays them on the console usage: a.out portno number of the port for the result socket

#define MAXHOSTNAME 80

int main(int argc, char * argv[]) {

      int s,si, retcode, i;
      unsigned int salen; 
      struct sockaddr_in sa,sai;
      char hostname[MAXHOSTNAME];
      si = socket(AF_INET,SOCK_STREAM,0);     // socket to receive the results
      if(si == -1) {
              perror("while opening socket"); return -1;}
      sai.sin_family = AF_INET;
      sai.sin_port = htons(atoi(argv[1]));
      gethostname(hostname,MAXHOSTNAME);
      memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), 
              sizeof(sa.sin_addr));
      retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai));
      if(retcode == -1) { 
              perror("while binding socket to addr"); return -1; }
      retcode = listen(si,1); 
      if(retcode == -1) { 
              perror("while calling listen"); return -1; }
      while(1==1) {
        salen = sizeof(sa);
        printf("Accepting connections\n");
        s = accept(si,(struct sockaddr *)&sa,&salen); // accept a connection
        if(s == 1) {
                perror("while accepting a connection"); return -1;
        }
        retcode = read(s,&i,sizeof(int));// read a res from one of the Worker
        if(retcode == -1) {
              perror("while reading a result"); return -1; }
        if(i==(-1)) { 
              printf("EOS -> terminating\n");
              break;
        }
        printf("Read result: %d ",i); fflush(stdout); // and print it on console
        close(s);
      }
      close(si);
      return 0; 

} </code>

Version 3

This version manages worker faults. The emitter process detects faults via errors in task request read. It schedules back the task delivered to the worker before the fault in a limbo task queue. Emitter accepts notification of task completion from the collector. When a notification is received, a task with the same tag is searched in the limbo queue and, if found, it is removed from the queue. When regular tasks are finished, limbo tasks are scheduled again before sending EOS to workers.

Suggested exercise: modify the code in such a way:

  • collector does not send any notification to the emitter
  • collector avoids delivering results with tag equal to one of the tasks previously delivered (on screen)
  • emitter inserts the last task delivered to faulty worker directly into the task queue, in such a way they are indiscriminately rescheduled for execution.

Emitter code

emitter.c
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <netdb.h>
#include <arpa/inet.h>
#include <pthread.h>
 
//  on demand scheduling of tasks to workers
//  receive request from worker i -> send task to compute to worker i
//  when no more tasks are available, send an EOS termination 
// 
//  usage is: 
//     a.out portno 
//           number of the port used to get worker task requests
// 
//  this is the version checking worker failure (emitter side) 
//  and just signaling a worker failure
// 
 
// the structure hosting the tasks
#include "task.h"
// this hosts tasks initially scheduled for execution
TASK * task_list = NULL; 
// this hosts tasks to be rescheduled (possibly) 
TASK * task_limbo = NULL; 
 
// to manage task list
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_mutex_t mutex_limbo = PTHREAD_MUTEX_INITIALIZER; 
// moved outside main: global 
int task = 0;           // tasks are positive integeres, in this case
int tasklimit = 10;     // we'll send tasklimit tasks before stopping
TASK eos;               // special task to denote End Of Stream
 
 
void * worker_handler(void * vs) {
  int s = *((int *) vs); 
  int retcode,i; 
  TASK * task_to_send; 
  int * taskNo = (int *) calloc(sizeof(int),1);         // host tasks computed
  // start reading tasks
  printf("worker_handler %ld started\n",pthread_self());
  while(1==1) {
    SERVICE request; 
    printf("Waiting for a request \n");
    // read request: task request from woker or task completion msg from coll
    retcode = read(s,&request,sizeof(SERVICE));   
    if(retcode != sizeof(SERVICE)) {  // should be != 0 (read can be split)
      perror("while reading task from worker"); fflush(stderr); }
    if(retcode == 0) { // connection broken
      printf("NEED TO RUN A SUBSTITUTE WORKER AFTER THIS DIED !!! \n"); 
      // you should know which is the last task sent, in such a way it can 
      // be rescheduled onto a different worker
      printf("TASK PRESUMABLY NOT COMPLETED is TASK <%d>\n",task_to_send->tag);
      pthread_mutex_lock(&mutex_limbo); // add task to limbo for reschedule
        {
          TASK * t = (TASK *) calloc(1,sizeof(TASK));
          t->v = task_to_send->v; 
          t->tag = task_to_send->tag; 
          t->next = task_limbo; 
          task_limbo = t; 
          printf("Task <%d,%d> -> LIMBO\n",t->v, t->tag);
        }
      pthread_mutex_unlock(&mutex_limbo);
      return (NULL); 
    }
    if(request.tag == TAG_COMPLETED) { // this is from collector 
        // cancel request.v tag from limbo
        TASK * p, *pp ;
        p = pp = task_limbo; 
        pthread_mutex_lock(&mutex_limbo); 
        while(p!=NULL) {
          if(p->tag == request.v) { // completed: to be removed
            if(p == pp) { // first element to be removed
              p = p->next; 
            } else {
              pp->next = p->next; // TODO: free unused one !
            }
          }
        }
        printf("Computed task tag=%d removed from LIMBO\n",request.tag); 
        pthread_mutex_unlock(&mutex_limbo); 
        return(NULL); // eventually terminate the thread
    } // else: emit a task to the worker and cycle (this is a worker manager)
    printf("Got request from %d ",request.v); fflush(stdout);
    pthread_mutex_lock(&mutex);
    if(task_list != NULL) {  // send a task to the requesting worker
          TASK * t = task_list; 
          task_list = t->next; 
          task_to_send = t; 
          taskNo++;
          pthread_mutex_unlock(&mutex); 
    } else { // if no more tasks, then check limbo or send EOS
          pthread_mutex_unlock(&mutex); 
          pthread_mutex_lock(&mutex_limbo); 
          if(task_limbo!= NULL) {
            task_to_send = task_limbo ; 
            task_limbo = task_limbo->next; 
            printf("Task pool is empty but sending task from Limbo\n");
          } else {
            task_to_send = &eos; 
          }
          pthread_mutex_lock(&mutex_limbo); 
    }
    write(s,&task_to_send,sizeof(TASK));  // send either task or eos
    if(task_to_send != &eos) {
          printf("sent task %d to worker %d\n",task,i);
    } else {
          printf("Send EOS to worker %d\n",i);
          close(s);
          return((void *)taskNo);
    }
  }
}
 
#define MAXHOSTNAME 80
#define MAXTHREADS  16
 
int main(int argc, char * argv[]) {
        int s,si, retcode, i;
        unsigned int salen; 
        struct sockaddr_in sa,sai;
        char hostname[MAXHOSTNAME];
        pthread_t tids[MAXTHREADS];  // thread handlers
        int tNo = 0;                 // thread to allocate
 
 
        // set up task structure
        for(i=0; i<tasklimit; i++) {
          TASK * t = (TASK *) calloc(1,sizeof(TASK)); 
          t->next = task_list; 
          t->v = i; 
          t->tag = i; 
          task_list = t; 
        }
        // set up eos
        eos.v = -1; 
        eos.tag = -2; 
        eos.next = NULL; 
 
        // code needed to set up the communication infrastructure
        printf("Declaring socket\n");
        si = socket(AF_INET,SOCK_STREAM,0);     // socket for inputs
        if(si == -1) {perror("opening socket for input"); return -1;}
        sai.sin_family = AF_INET;
        sai.sin_port = htons(atoi(argv[1]));
        gethostname(hostname,MAXHOSTNAME);
        memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), 
                sizeof(sa.sin_addr));
        printf("Binding to %s\n",inet_ntoa(sai.sin_addr));
        retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai));
        if(retcode == -1) {
                perror("while calling bind"); return -1; }
        printf("Listening socket\n");
        retcode = listen(si,1); 
        if(retcode == -1) { 
                perror("while calling listen"); return -1; }
 
        while(1==1) {
          int * s = (int *) calloc(sizeof(int),1); 
 
          salen = sizeof(sa);
          printf("Accepting connections .... \n");
          *s = accept(si,(struct sockaddr *)&sa,&salen);  // accept a connection
          if(*s == 1) {
                  perror("while calling an accept"); return -1; }
          // now fork a thread to permanently handle the connection
          pthread_create(&tids[tNo],NULL,worker_handler,(void *) s);      
          printf("Created worker_handle No %d\n",tNo);
          tNo++; 
        }
        printf("Closing operations\n");
        return 0; 
}

Worker code

worker.c
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <netdb.h>
#include <unistd.h>
 
#include "task.h"
 
/* 
 *   function to be computed on the single task
 *   this is actually the only "business logic" code
 */
 
int f(int task) {
        sleep(1);  // just to simulate some work ... 
        return (task*task);
}
 
/* 
 *   receives integer tasks and delivers integer results, up to the reception
 *   of an EOS (a -1) 
 *   this version opens/closes the sockets at each task, which is not 
 *   definitely efficient ... 
 * 
 *   usage: 
 *      a.out emitter_address emitter_port collector_address collector_port
 */
 
int main(int argc, char * argv[]) {
        int s, cs, retcode;     // task socket, result socket, error ret
        struct sockaddr_in sa;
        int pid = 0; 
 
 
        pid = getpid();    // get my POSIX process id 
        printf("This is worker %d\n",pid);
        // connect permanently to the emitter process 
        s = socket(AF_INET,SOCK_STREAM,0);   // socket to access emitter
        if(s == -1) { 
                perror("while creating the socket to emitter"); 
                return (-1); }
        printf("Opened E socket %d\n",s);
        sa.sin_family = AF_INET;
        sa.sin_port = htons(atoi(argv[2]));
        memcpy(&sa.sin_addr, (gethostbyname(argv[1])->h_addr), 
                sizeof(sa.sin_addr));
        retcode = connect(s,(struct sockaddr *)&sa,sizeof(sa));
        if(retcode == -1) {
                perror("while connecting to emitter"); return(-1);}
 
        while(1==1) {
                TASK task; 
                RESULT result; 
                SERVICE request; 
                // sends a request to the emitter 
                request.tag = TAG_TASK_REQUEST; 
                request.v = pid; 
                retcode = write(s,&request,sizeof(SERVICE)); 
                if(retcode == -1) { perror("sending a request"); return -1; }
                retcode = read(s,&task,sizeof(TASK)); // get a task
                if(retcode != sizeof(TASK)) { // this could be corrected in case
                        // of large tasks (while(len!=sizeof) read)
                        perror("while receiving a task"); return -1; }
 
                // check for EOS
                if(task.tag < 0)  {
                        printf("Received EOS\n");
                        break;  // if EOS terminate loop iteratons
                } else {        
                        printf("Received task %d\n",task.tag);
                        // otherwise process the incoming task
                        result.v = f(task.v); 
                        result.tag = task.tag; 
                        result.next = NULL; 
                        printf("Computed %d -> %d\n",task.v,result.v);
 
                        // send result to the collector
                        cs = socket(AF_INET,SOCK_STREAM,0); // create socket to collector
                        sa.sin_family = AF_INET;
                        sa.sin_port = htons(atoi(argv[4]));
                        memcpy(&sa.sin_addr, (gethostbyname(argv[3])->h_addr), 
                                sizeof(sa.sin_addr));
                        retcode = connect(cs,(struct sockaddr *)&sa,sizeof(sa));
                        if(retcode == -1) {
                                perror("connecting to the collector"); 
                                return(-1);}
 
                        // send the result and close connection
                        write(cs,&result,sizeof(RESULT));
                        close(cs);
                        printf("Sent to C socket, E sock %d\n",s);
 
                        // then cycle again
                }
        }
        close(s);
        return 0; 
}

Collector code

collector.c
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <netdb.h>
 
#include "task.h"
 
// receives results from the workers and displays them on the console 
// usage: 
//    a.out portno emitterhost emitterport
//          number of the port for the result socket 
//          then address of emitter (to send feedbacks on task completion)
 
#define MAXHOSTNAME 80  
 
int main(int argc, char * argv[]) {
        int s, ei, si, retcode, i;
        RESULT result;
        unsigned int salen; 
        struct sockaddr_in sa,sai,eai;
        char hostname[MAXHOSTNAME];
        char * emitterhost = argv[2];
        int emitter_port = atoi(argv[3]); 
 
        si = socket(AF_INET,SOCK_STREAM,0);     // socket to receive the results
        if(si == -1) {
                perror("while opening socket"); return -1;}
        sai.sin_family = AF_INET;
        sai.sin_port = htons(atoi(argv[1]));
        gethostname(hostname,MAXHOSTNAME);
        memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), 
                sizeof(sai.sin_addr));
        retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai));
        if(retcode == -1) { 
                perror("while binding socket to addr"); return -1; }
        retcode = listen(si,1); 
        if(retcode == -1) { 
                perror("while calling listen"); return -1; }
 
        while(1==1) {
          SERVICE feedback; 
          salen = sizeof(sa);
          printf("Accepting connections\n");
          s = accept(si,(struct sockaddr *)&sa,&salen); // accept a connection
          if(s == 1) {
                  perror("while accepting a connection"); return -1;
          }
          // read a res from one of the Worker 
          retcode = read(s,&result,sizeof(RESULT));
          if(retcode != sizeof(RESULT)) {  // AGAIN, should be corrected 
                perror("while reading a result"); return -1; }
          if(result.tag < 0) { 
                printf("EOS -> terminating\n");
                break;
          }
          printf("Read result: <%d,%d> ",result.v,result.tag); 
          fflush(stdout); // and print it on console
          close(s);
          // now send a feedback to emitter process with the tag of the 
          // received task: in case it was selected for re-scheduling
          // it will be removed, otherwise it will be rescheduled after
          // completing "normal" tasks
          ei = socket(AF_INET,SOCK_STREAM,0);   // socket to receive the results
          if(ei == -1) {
                  perror("while opening socket"); return -1;}
          eai.sin_family = AF_INET;
          eai.sin_port = htons(emitter_port);
          memcpy(&eai.sin_addr, (gethostbyname(emitterhost)->h_addr), 
                  sizeof(eai.sin_addr));
          retcode = connect(ei,(struct sockaddr *)&eai,sizeof(eai));
          if(retcode == -1) {
                perror("while connecting to emitter"); return(-1);}
          // send feedback on computed task     
          feedback.tag = TAG_COMPLETED; 
          feedback.v   = result.tag; 
          retcode = write(ei,&feedback,sizeof(SERVICE));
          if(retcode != sizeof(SERVICE)) {
                perror("while writing feedback to emitter"); return (-1); }
          close(ei); // one shot: TODO use permanent connection on dedicated ss 
        }
        close(si);
        return 0; 
}

Attention: this code has an error (it does not work perfectly). Try to figure out which error (which is the erroneous behaviour of the code) and how it can be corrected.

magistraleinformaticanetworking/spm/farmposix.txt · Ultima modifica: 22/03/2011 alle 18:02 (13 anni fa) da Marco Danelutto