Questa è una vecchia versione del documento!
Indice
Task farm (POSIX TCP/IP)
Version 1
This version uses two servers (emitter and collector). Emitter accepts requests from workers and delivers tasks to be executed. Collector accepts results from workers and print them out on the screen. Workers open a separate connection with both emitter and collector for each task. The two connections are closed after processing the task and delivering the result. Tasks and results are (positive) integers. Emitter and worker termination is handled correctly. Collector does not terminate (suggestion: each worker should propagate EOS to collector. Collector should count EOS and terminate after receiving EOS from all the workers).
Emitter code
#include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netdb.h> #include <arpa/inet.h> // on demand scheduling of tasks to workers // receive request from worker i -> send task to compute to worker i // when no more tasks are available, send an EOS termination // // usage is: // a.out portno // number of the port used to get worker task requests #define MAXHOSTNAME 80 int main(int argc, char * argv[]) { int s,si, retcode, i; unsigned int salen; struct sockaddr_in sa,sai; char hostname[MAXHOSTNAME]; int task = 0; // tasks are positive integeres, in this case int tasklimit = 100; // we'll send tasklimit tasks before stopping int eos = -1; // special task to denote End Of Stream // code needed to set up the communication infrastructure printf("Declaring socket\n"); si = socket(AF_INET,SOCK_STREAM,0); // socket for inputs if(si == -1) {perror("opening socket for input"); return -1;} sai.sin_family = AF_INET; sai.sin_port = htons(atoi(argv[1])); gethostname(hostname,MAXHOSTNAME); memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), sizeof(sa.sin_addr)); printf("Binding to %s\n",inet_ntoa(sai.sin_addr)); retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai)); if(retcode == -1) { perror("while calling bind"); return -1; } printf("Listening socket\n"); retcode = listen(si,1); if(retcode == -1) { perror("while calling listen"); return -1; } while(1==1) { salen = sizeof(sa); printf("Accepting connections .... \n"); s = accept(si,(struct sockaddr *)&sa,&salen); // accept a connection if(s == 1) { perror("while calling an accept"); return -1; } retcode = read(s,&i,sizeof(int)); // read request from worker if(retcode == -1) { perror("while reading task from worker"); return -1; } printf("%d ",i); fflush(stdout); if(task < tasklimit) { // send a task to the requesting worker write(s,&task,sizeof(task)); printf("sent task %d to worker %d\n",task,i); task++; // next task to be sent } else { // if no more tasks, then send an EOS write(s,&eos,sizeof(task)); printf("Send EOS to worker %d\n",i); } close(s); } printf("Closing operations\n"); return 0; }
Worker code
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netdb.h> #include <unistd.h> /* * function to be computed on the single task * this is actually the only "business logic" code */ int f(int task) { sleep(1); // just to simulate some work ... return (task*task); } /* * receives integer tasks and delivers integer results, up to the reception * of an EOS (a -1) * this version opens/closes the sockets at each task, which is not * definitely efficient ... * * usage: * a.out emitter_address emitter_port collector_address collector_port */ int main(int argc, char * argv[]) { int s, retcode; struct sockaddr_in sa; int pid = 0; pid = getpid(); // get my POSIX process id printf("This is worker %d\n",pid); while(1==1) { int task, result; s = socket(AF_INET,SOCK_STREAM,0); // socket to access emitter if(s == -1) { perror("while creating the socket to emitter"); return (-1); } sa.sin_family = AF_INET; sa.sin_port = htons(atoi(argv[2])); memcpy(&sa.sin_addr, (gethostbyname(argv[1])->h_addr), sizeof(sa.sin_addr)); retcode = connect(s,(struct sockaddr *)&sa,sizeof(sa)); if(retcode == -1) { perror("while connecting to emitter"); return(-1);} write(s,&pid,sizeof(pid)); // sends a request to the emitter retcode = read(s,&task,sizeof(task)); // get a task if(retcode != sizeof(task)) { perror("while receiving a task"); return -1; } // check for EOS if(task < 0) { printf("Received EOS\n"); break; // if EOS terminate loop iteratons } else { printf("Received task %d\n",task); // otherwise process the incoming task result = f(task); printf("Computed %d -> %d\n",task,result); // send result to the collector s = socket(AF_INET,SOCK_STREAM,0); // create socket to collector sa.sin_family = AF_INET; sa.sin_port = htons(atoi(argv[4])); memcpy(&sa.sin_addr, (gethostbyname(argv[3])->h_addr), sizeof(sa.sin_addr)); retcode = connect(s,(struct sockaddr *)&sa,sizeof(sa)); if(retcode == -1) { perror("connecting to the collector"); return(-1);} // send the result and close connection write(s,&result,sizeof(result)); close(s); // then cycle again } } close(s); return 0; }
Collector code
#include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netdb.h> // receives results from the workers and displays them on the console // usage: // a.out portno // number of the port for the result socket #define MAXHOSTNAME 80 int main(int argc, char * argv[]) { int s,si, retcode, i; unsigned int salen; struct sockaddr_in sa,sai; char hostname[MAXHOSTNAME]; si = socket(AF_INET,SOCK_STREAM,0); // socket to receive the results if(si == -1) { perror("while opening socket"); return -1;} sai.sin_family = AF_INET; sai.sin_port = htons(atoi(argv[1])); gethostname(hostname,MAXHOSTNAME); memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), sizeof(sa.sin_addr)); retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai)); if(retcode == -1) { perror("while binding socket to addr"); return -1; } retcode = listen(si,1); if(retcode == -1) { perror("while calling listen"); return -1; } while(1==1) { salen = sizeof(sa); printf("Accepting connections\n"); s = accept(si,(struct sockaddr *)&sa,&salen); // accept a connection if(s == 1) { perror("while accepting a connection"); return -1; } retcode = read(s,&i,sizeof(int));// read a res from one of the Worker if(retcode == -1) { perror("while reading a result"); return -1; } if(i==(-1)) { printf("EOS -> terminating\n"); break; } printf("Read result: %d ",i); fflush(stdout); // and print it on console close(s); } close(si); return 0; }
Deployment code (Perl)
#!/usr/bin/perl $sourcedir = "/home/marcod/Documents/Didattica/SPM/Dispensa/Codice/FarmSocket"; foreach $m (@ARGV) { print "Deploying to $m\n"; system("rsync -avz $sourcedir $m:"); print "Done!\n"; } exit;
Deployment code (BASH)
#!/bin/bash for m in $@ do rsync -avz /home/marcod/Documents/Didattica/SPM/Dispensa/Codice/FarmSocket $m: done
Launcher code (Perl)
The launcher gets parameters from the command line (see comments) and saves the names of the machines used in the
machine.used
file. These machine names are then used by the
terminator.pl
program.
#!/usr/bin/perl ### launcher emitter_machine emitter_port collector_machine collector_port w_machine* $emachine = shift @ARGV; $eport = shift @ARGV; $cmachine = shift @ARGV; $cport = shift @ARGV; open FD, ">machines.used" or die "Cannot open log file"; system("ssh $emachine \"\( cd FarmSocket\; make emitter\; ./emitter $eport \)\" & "); print FD "$emachine\n"; sleep(5); system("ssh $cmachine \"\( cd FarmSocket\; make collector\; ./collector $cport \)\" & "); print FD "$cmachine\n"; sleep(5); foreach $w (@ARGV) { system("ssh $w \"\( cd FarmSocket\; make worker\; ./worker $emachine $eport $cmachine $cport \)\" & "); print FD "$w\n"; } close FD; exit;
Terminator code (Perl)
#!/usr/bin/perl open FD, "<machines.used" or die "Cannot open log file for reading"; while(<FD>) { $m = $_ ; chop $m; system("ssh $m kill -9 -1"); } close FD; system("rm -f machines.used"); exit;
Version 2
Workers permanently connect to emitter. Emitter detects worker shutdown via errors returned by read syscalls and signals (printf) that the worker died and a task needs to be considered for rescheduling. Emitter is multithreaded: each thread manages one worker. Therefore this version detects worker faults but does not manage them.
Emitter code
#include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netdb.h> #include <arpa/inet.h> #include <pthread.h> // on demand scheduling of tasks to workers // receive request from worker i -> send task to compute to worker i // when no more tasks are available, send an EOS termination // // usage is: // a.out portno // number of the port used to get worker task requests // // this is the version checking worker failure (emitter side) // and just signaling a worker failure // // data structure taking care of the tasks to compute. We need to push // back tasks that have not presumably been computed due to failures // // to manage task list pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // moved outside main: global int task = 0; // tasks are positive integeres, in this case int tasklimit = 100; // we'll send tasklimit tasks before stopping int eos = -1; // special task to denote End Of Stream void * worker_handler(void * vs) { int s = *((int *) vs); int retcode,i; int task_to_send; int * taskNo = (int *) calloc(sizeof(int),1); // host tasks computed // start reading tasks printf("worker_handler %ld started\n",pthread_self()); while(1==1) { printf("Waiting for a request \n"); retcode = read(s,&i,sizeof(int)); // read request from worker if(retcode != sizeof(int)) { // should be != 0 (read can be split) perror("while reading task from worker"); fflush(stderr); printf("NEED TO RUN A SUBSTITUTE WORKER AFTER THIS DIED !!! \n"); // you should know which is the last task sent, in such a way it can // be rescheduled onto a different worker printf("TASK PRESUMABLY NOT COMPLETED is TASK <%d>\n",task_to_send); return (NULL); } printf("%d ",i); fflush(stdout); pthread_mutex_lock(&mutex); if(task < tasklimit) { // send a task to the requesting worker task++; // next task to be sent task_to_send = task; taskNo++; } else { // if no more tasks, then send an EOS task_to_send = eos; } pthread_mutex_unlock(&mutex); write(s,&task_to_send,sizeof(task_to_send)); // send either task or eos if(task_to_send != eos) { printf("sent task %d to worker %d\n",task,i); } else { printf("Send EOS to worker %d\n",i); close(s); return((void *)taskNo); } } } #define MAXHOSTNAME 80 #define MAXTHREADS 16 int main(int argc, char * argv[]) { int s,si, retcode, i; unsigned int salen; struct sockaddr_in sa,sai; char hostname[MAXHOSTNAME]; pthread_t tids[MAXTHREADS]; // thread handlers int tNo = 0; // thread to allocate // code needed to set up the communication infrastructure printf("Declaring socket\n"); si = socket(AF_INET,SOCK_STREAM,0); // socket for inputs if(si == -1) {perror("opening socket for input"); return -1;} sai.sin_family = AF_INET; sai.sin_port = htons(atoi(argv[1])); gethostname(hostname,MAXHOSTNAME); memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), sizeof(sa.sin_addr)); printf("Binding to %s\n",inet_ntoa(sai.sin_addr)); retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai)); if(retcode == -1) { perror("while calling bind"); return -1; } printf("Listening socket\n"); retcode = listen(si,1); if(retcode == -1) { perror("while calling listen"); return -1; } while(1==1) { int * s = (int *) calloc(sizeof(int),1); salen = sizeof(sa); printf("Accepting connections .... \n"); *s = accept(si,(struct sockaddr *)&sa,&salen); // accept a connection if(*s == 1) { perror("while calling an accept"); return -1; } // now fork a thread to permanently handle the connection pthread_create(&tids[tNo],NULL,worker_handler,(void *) s); printf("Created worker_handle No %d\n",tNo); tNo++; } printf("Closing operations\n"); return 0; }
Worker code
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netdb.h> #include <unistd.h> /* * function to be computed on the single task * this is actually the only "business logic" code */ int f(int task) { sleep(1); // just to simulate some work ... return (task*task); } /* * receives integer tasks and delivers integer results, up to the reception * of an EOS (a -1) * this version opens/closes the sockets at each task, which is not * definitely efficient ... * * usage: * a.out emitter_address emitter_port collector_address collector_port */ int main(int argc, char * argv[]) { int s, cs, retcode; // task socket, result socket, error ret struct sockaddr_in sa; int pid = 0; pid = getpid(); // get my POSIX process id printf("This is worker %d\n",pid); // connect permanently to the emitter process s = socket(AF_INET,SOCK_STREAM,0); // socket to access emitter if(s == -1) { perror("while creating the socket to emitter"); return (-1); } printf("Opened E socket %d\n",s); sa.sin_family = AF_INET; sa.sin_port = htons(atoi(argv[2])); memcpy(&sa.sin_addr, (gethostbyname(argv[1])->h_addr), sizeof(sa.sin_addr)); retcode = connect(s,(struct sockaddr *)&sa,sizeof(sa)); if(retcode == -1) { perror("while connecting to emitter"); return(-1);} while(1==1) { int task, result; // sends a request to the emitter printf("Writing to E socket %d\n",s); retcode = write(s,&pid,sizeof(pid)); if(retcode == -1) { perror("sending a request"); return -1; } printf("Reading from E socket %d\n",s); retcode = read(s,&task,sizeof(task)); // get a task if(retcode != sizeof(task)) { perror("while receiving a task"); return -1; } printf("Read from E socket %d\n",s); // check for EOS if(task < 0) { printf("Received EOS\n"); break; // if EOS terminate loop iteratons } else { printf("Received task %d\n",task); // otherwise process the incoming task result = f(task); printf("Computed %d -> %d\n",task,result); // send result to the collector cs = socket(AF_INET,SOCK_STREAM,0); // create socket to collector sa.sin_family = AF_INET; sa.sin_port = htons(atoi(argv[4])); memcpy(&sa.sin_addr, (gethostbyname(argv[3])->h_addr), sizeof(sa.sin_addr)); retcode = connect(cs,(struct sockaddr *)&sa,sizeof(sa)); if(retcode == -1) { perror("connecting to the collector"); return(-1);} // send the result and close connection write(cs,&result,sizeof(result)); close(cs); printf("Sent to C socket, E sock %d\n",s); // then cycle again } } close(s); return 0; }
Collector code
#include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netdb.h> // receives results from the workers and displays them on the console // usage: // a.out portno // number of the port for the result socket #define MAXHOSTNAME 80 int main(int argc, char * argv[]) { int s,si, retcode, i; unsigned int salen; struct sockaddr_in sa,sai; char hostname[MAXHOSTNAME]; si = socket(AF_INET,SOCK_STREAM,0); // socket to receive the results if(si == -1) { perror("while opening socket"); return -1;} sai.sin_family = AF_INET; sai.sin_port = htons(atoi(argv[1])); gethostname(hostname,MAXHOSTNAME); memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), sizeof(sa.sin_addr)); retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai)); if(retcode == -1) { perror("while binding socket to addr"); return -1; } retcode = listen(si,1); if(retcode == -1) { perror("while calling listen"); return -1; } while(1==1) { salen = sizeof(sa); printf("Accepting connections\n"); s = accept(si,(struct sockaddr *)&sa,&salen); // accept a connection if(s == 1) { perror("while accepting a connection"); return -1; } retcode = read(s,&i,sizeof(int));// read a res from one of the Worker if(retcode == -1) { perror("while reading a result"); return -1; } if(i==(-1)) { printf("EOS -> terminating\n"); break; } printf("Read result: %d ",i); fflush(stdout); // and print it on console close(s); } close(si); return 0; }