magistraleinformaticanetworking:spm:farmposix
Differenze
Queste sono le differenze tra la revisione selezionata e la versione attuale della pagina.
Entrambe le parti precedenti la revisioneRevisione precedenteProssima revisione | Revisione precedente | ||
magistraleinformaticanetworking:spm:farmposix [22/03/2011 alle 17:30 (14 anni fa)] – Marco Danelutto | magistraleinformaticanetworking:spm:farmposix [22/03/2011 alle 18:02 (14 anni fa)] (versione attuale) – [Task farm (POSIX TCP/IP)] Marco Danelutto | ||
---|---|---|---|
Linea 1: | Linea 1: | ||
===== Task farm (POSIX TCP/IP) ===== | ===== Task farm (POSIX TCP/IP) ===== | ||
+ | //(The code presented here does not pretend to be efficient, nor completely correct. It has been designed with the aim of illustrating several of the problems related to the implementation of fairly simple parallel applications)// | ||
==== Version 1 ==== | ==== Version 1 ==== | ||
This version uses two servers (emitter and collector). Emitter accepts requests from workers and delivers tasks to be executed. Collector accepts results from workers and print them out on the screen. Workers open a separate connection with both emitter and collector for each task. The two connections are closed after processing the task and delivering the result. Tasks and results are (positive) integers. Emitter and worker termination is handled correctly. Collector does not terminate (suggestion: | This version uses two servers (emitter and collector). Emitter accepts requests from workers and delivers tasks to be executed. Collector accepts results from workers and print them out on the screen. Workers open a separate connection with both emitter and collector for each task. The two connections are closed after processing the task and delivering the result. Tasks and results are (positive) integers. Emitter and worker termination is handled correctly. Collector does not terminate (suggestion: | ||
=== Emitter code === | === Emitter code === | ||
- | < | + | < |
#include < | #include < | ||
#include < | #include < | ||
Linea 80: | Linea 81: | ||
=== Worker code === | === Worker code === | ||
- | < | + | < |
#include < | #include < | ||
#include < | #include < | ||
Linea 172: | Linea 173: | ||
=== Collector code === | === Collector code === | ||
- | < | + | < |
#include < | #include < | ||
#include < | #include < | ||
Linea 231: | Linea 232: | ||
} | } | ||
</ | </ | ||
+ | |||
+ | === Makefile === | ||
+ | <code makefile makefile> | ||
+ | CC = gcc | ||
+ | CFLAGS = | ||
+ | TARGETS = emitter collector worker | ||
+ | |||
+ | all: $(TARGETS) | ||
+ | |||
+ | clean: | ||
+ | rm -f $(TARGETS) | ||
+ | |||
+ | worker: | ||
+ | $(CC) -o worker $(CFLAGS) worker.c | ||
+ | |||
+ | collector: | ||
+ | $(CC) -o collector $(CFLAGS) collector.c | ||
+ | |||
+ | emitter: | ||
+ | $(CC) -o emitter $(CFLAGS) emitter.c | ||
+ | |||
+ | dist: | ||
+ | rsync -avz ../ | ||
+ | rsync -avz ../ | ||
+ | rsync -avz ../ | ||
+ | </ | ||
+ | |||
+ | === Deployment code (Perl) === | ||
+ | <code perl deploy.pl> | ||
+ | # | ||
+ | $sourcedir = "/ | ||
+ | foreach $m (@ARGV) { | ||
+ | print " | ||
+ | system(" | ||
+ | print " | ||
+ | } | ||
+ | exit; | ||
+ | </ | ||
+ | |||
+ | === Deployment code (BASH) === | ||
+ | <code bash deploy.bash> | ||
+ | #!/bin/bash | ||
+ | for m in $@ | ||
+ | do | ||
+ | rsync -avz / | ||
+ | done | ||
+ | </ | ||
+ | |||
+ | === Launcher code (Perl) === | ||
+ | The launcher gets parameters from the command line (see comments) and saves the names of the machines used in the < | ||
+ | |||
+ | <code perl launcher.pl> | ||
+ | # | ||
+ | |||
+ | ### launcher emitter_machine emitter_port collector_machine collector_port w_machine* | ||
+ | |||
+ | $emachine = shift @ARGV; | ||
+ | $eport | ||
+ | $cmachine = shift @ARGV; | ||
+ | $cport | ||
+ | |||
+ | open FD, "> | ||
+ | system(" | ||
+ | print FD " | ||
+ | sleep(5); | ||
+ | system(" | ||
+ | print FD " | ||
+ | sleep(5); | ||
+ | |||
+ | foreach $w (@ARGV) { | ||
+ | system(" | ||
+ | print FD " | ||
+ | |||
+ | } | ||
+ | close FD; | ||
+ | exit; | ||
+ | </ | ||
+ | |||
+ | === Terminator code (Perl) === | ||
+ | <code perl terminator.pl> | ||
+ | # | ||
+ | open FD, "< | ||
+ | while(< | ||
+ | $m = $_ ; | ||
+ | chop $m; | ||
+ | system(" | ||
+ | } | ||
+ | close FD; | ||
+ | system(" | ||
+ | exit; | ||
+ | </ | ||
+ | |||
+ | ==== Version 2 ==== | ||
+ | Workers permanently connect to emitter. Emitter detects worker shutdown via errors returned by read syscalls and signals (printf) that the worker died and a task needs to be considered for rescheduling. | ||
+ | Emitter is multithreaded: | ||
+ | Therefore this version detects worker faults but does not manage them. | ||
+ | |||
+ | === Emitter code === | ||
+ | <code c emitter.c> | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | |||
+ | // on demand scheduling of tasks to workers | ||
+ | // receive request from worker i -> send task to compute to worker i | ||
+ | // when no more tasks are available, send an EOS termination | ||
+ | // | ||
+ | // usage is: | ||
+ | // a.out portno | ||
+ | // | ||
+ | // | ||
+ | // this is the version checking worker failure (emitter side) | ||
+ | // and just signaling a worker failure | ||
+ | // | ||
+ | |||
+ | // data structure taking care of the tasks to compute. We need to push | ||
+ | // back tasks that have not presumably been computed due to failures | ||
+ | // | ||
+ | |||
+ | // to manage task list | ||
+ | pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; | ||
+ | // moved outside main: global | ||
+ | int task = 0; // tasks are positive integeres, in this case | ||
+ | int tasklimit = 100; // we'll send tasklimit tasks before stopping | ||
+ | int eos = -1; // special task to denote End Of Stream | ||
+ | |||
+ | void * worker_handler(void * vs) { | ||
+ | int s = *((int *) vs); | ||
+ | int retcode, | ||
+ | int task_to_send; | ||
+ | int * taskNo = (int *) calloc(sizeof(int), | ||
+ | // start reading tasks | ||
+ | printf(" | ||
+ | while(1==1) { | ||
+ | printf(" | ||
+ | retcode = read(s,& | ||
+ | if(retcode != sizeof(int)) { // should be != 0 (read can be split) | ||
+ | perror(" | ||
+ | printf(" | ||
+ | // you should know which is the last task sent, in such a way it can | ||
+ | // be rescheduled onto a different worker | ||
+ | printf(" | ||
+ | return (NULL); | ||
+ | } | ||
+ | printf(" | ||
+ | pthread_mutex_lock(& | ||
+ | if(task < tasklimit) { // send a task to the requesting worker | ||
+ | task++; | ||
+ | task_to_send = task; | ||
+ | taskNo++; | ||
+ | } else { // if no more tasks, then send an EOS | ||
+ | task_to_send = eos; | ||
+ | } | ||
+ | pthread_mutex_unlock(& | ||
+ | write(s,& | ||
+ | if(task_to_send != eos) { | ||
+ | printf(" | ||
+ | } else { | ||
+ | printf(" | ||
+ | close(s); | ||
+ | return((void *)taskNo); | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | |||
+ | #define MAXHOSTNAME 80 | ||
+ | #define MAXTHREADS | ||
+ | |||
+ | int main(int argc, char * argv[]) { | ||
+ | int s,si, retcode, i; | ||
+ | unsigned int salen; | ||
+ | struct sockaddr_in sa,sai; | ||
+ | char hostname[MAXHOSTNAME]; | ||
+ | pthread_t tids[MAXTHREADS]; | ||
+ | int tNo = 0; // thread to allocate | ||
+ | |||
+ | |||
+ | // code needed to set up the communication infrastructure | ||
+ | printf(" | ||
+ | si = socket(AF_INET, | ||
+ | if(si == -1) {perror(" | ||
+ | sai.sin_family = AF_INET; | ||
+ | sai.sin_port = htons(atoi(argv[1])); | ||
+ | gethostname(hostname, | ||
+ | memcpy(& | ||
+ | sizeof(sa.sin_addr)); | ||
+ | printf(" | ||
+ | retcode = bind(si, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | printf(" | ||
+ | retcode = listen(si, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | |||
+ | while(1==1) { | ||
+ | int * s = (int *) calloc(sizeof(int), | ||
+ | |||
+ | salen = sizeof(sa); | ||
+ | printf(" | ||
+ | *s = accept(si, | ||
+ | if(*s == 1) { | ||
+ | perror(" | ||
+ | // now fork a thread to permanently handle the connection | ||
+ | pthread_create(& | ||
+ | printf(" | ||
+ | tNo++; | ||
+ | } | ||
+ | printf(" | ||
+ | return 0; | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | === Worker code === | ||
+ | <code c worker.c> | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | |||
+ | /* | ||
+ | | ||
+ | | ||
+ | */ | ||
+ | |||
+ | int f(int task) { | ||
+ | sleep(1); | ||
+ | return (task*task); | ||
+ | } | ||
+ | |||
+ | /* | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | */ | ||
+ | |||
+ | int main(int argc, char * argv[]) { | ||
+ | int s, cs, retcode; | ||
+ | struct sockaddr_in sa; | ||
+ | int pid = 0; | ||
+ | | ||
+ | | ||
+ | pid = getpid(); | ||
+ | printf(" | ||
+ | // connect permanently to the emitter process | ||
+ | s = socket(AF_INET, | ||
+ | if(s == -1) { | ||
+ | perror(" | ||
+ | return (-1); } | ||
+ | printf(" | ||
+ | sa.sin_family = AF_INET; | ||
+ | sa.sin_port = htons(atoi(argv[2])); | ||
+ | memcpy(& | ||
+ | sizeof(sa.sin_addr)); | ||
+ | retcode = connect(s, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | | ||
+ | while(1==1) { | ||
+ | int task, result; | ||
+ | // sends a request to the emitter | ||
+ | printf(" | ||
+ | retcode = write(s,& | ||
+ | if(retcode == -1) { perror(" | ||
+ | printf(" | ||
+ | retcode = read(s,& | ||
+ | if(retcode != sizeof(task)) { | ||
+ | perror(" | ||
+ | printf(" | ||
+ | | ||
+ | // check for EOS | ||
+ | if(task < 0) { | ||
+ | printf(" | ||
+ | break; | ||
+ | } else { | ||
+ | printf(" | ||
+ | // otherwise process the incoming task | ||
+ | result = f(task); | ||
+ | printf(" | ||
+ | | ||
+ | // send result to the collector | ||
+ | cs = socket(AF_INET, | ||
+ | sa.sin_family = AF_INET; | ||
+ | sa.sin_port = htons(atoi(argv[4])); | ||
+ | memcpy(& | ||
+ | sizeof(sa.sin_addr)); | ||
+ | retcode = connect(cs, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | return(-1); | ||
+ | | ||
+ | // send the result and close connection | ||
+ | write(cs,& | ||
+ | close(cs); | ||
+ | printf(" | ||
+ | | ||
+ | // then cycle again | ||
+ | } | ||
+ | } | ||
+ | close(s); | ||
+ | return 0; | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | === Collector code === | ||
+ | <codec collector.c> | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | |||
+ | // receives results from the workers and displays them on the console | ||
+ | // usage: | ||
+ | // a.out portno | ||
+ | // number of the port for the result socket | ||
+ | |||
+ | #define MAXHOSTNAME 80 | ||
+ | |||
+ | int main(int argc, char * argv[]) { | ||
+ | int s,si, retcode, i; | ||
+ | unsigned int salen; | ||
+ | struct sockaddr_in sa,sai; | ||
+ | char hostname[MAXHOSTNAME]; | ||
+ | |||
+ | si = socket(AF_INET, | ||
+ | if(si == -1) { | ||
+ | perror(" | ||
+ | sai.sin_family = AF_INET; | ||
+ | sai.sin_port = htons(atoi(argv[1])); | ||
+ | gethostname(hostname, | ||
+ | memcpy(& | ||
+ | sizeof(sa.sin_addr)); | ||
+ | retcode = bind(si, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | retcode = listen(si, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | |||
+ | while(1==1) { | ||
+ | salen = sizeof(sa); | ||
+ | printf(" | ||
+ | s = accept(si, | ||
+ | if(s == 1) { | ||
+ | perror(" | ||
+ | } | ||
+ | retcode = read(s,& | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | if(i==(-1)) { | ||
+ | printf(" | ||
+ | break; | ||
+ | } | ||
+ | printf(" | ||
+ | close(s); | ||
+ | } | ||
+ | close(si); | ||
+ | return 0; | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | ==== Version 3 ==== | ||
+ | This version manages worker faults. The emitter process detects faults via errors in task request read. It schedules back the task delivered to the worker before the fault in a //limbo// task queue. Emitter accepts notification of task completion from the collector. | ||
+ | When a notification is received, a task with the same tag is searched in the limbo queue and, if found, it is removed from the queue. | ||
+ | When regular tasks are finished, //limbo// tasks are scheduled again before sending EOS to workers. | ||
+ | |||
+ | **Suggested exercise**: modify the code in such a way: | ||
+ | * collector does not send any notification to the emitter | ||
+ | * collector avoids delivering results with tag equal to one of the tasks previously delivered (on screen) | ||
+ | * emitter inserts the last task delivered to faulty worker directly into the task queue, in such a way they are indiscriminately rescheduled for execution. | ||
+ | |||
+ | === Emitter code === | ||
+ | <code c emitter.c> | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | |||
+ | // on demand scheduling of tasks to workers | ||
+ | // receive request from worker i -> send task to compute to worker i | ||
+ | // when no more tasks are available, send an EOS termination | ||
+ | // | ||
+ | // usage is: | ||
+ | // a.out portno | ||
+ | // | ||
+ | // | ||
+ | // this is the version checking worker failure (emitter side) | ||
+ | // and just signaling a worker failure | ||
+ | // | ||
+ | |||
+ | // the structure hosting the tasks | ||
+ | #include " | ||
+ | // this hosts tasks initially scheduled for execution | ||
+ | TASK * task_list = NULL; | ||
+ | // this hosts tasks to be rescheduled (possibly) | ||
+ | TASK * task_limbo = NULL; | ||
+ | |||
+ | // to manage task list | ||
+ | pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; | ||
+ | pthread_mutex_t mutex_limbo = PTHREAD_MUTEX_INITIALIZER; | ||
+ | // moved outside main: global | ||
+ | int task = 0; // tasks are positive integeres, in this case | ||
+ | int tasklimit = 10; // we'll send tasklimit tasks before stopping | ||
+ | TASK eos; // special task to denote End Of Stream | ||
+ | |||
+ | |||
+ | void * worker_handler(void * vs) { | ||
+ | int s = *((int *) vs); | ||
+ | int retcode, | ||
+ | TASK * task_to_send; | ||
+ | int * taskNo = (int *) calloc(sizeof(int), | ||
+ | // start reading tasks | ||
+ | printf(" | ||
+ | while(1==1) { | ||
+ | SERVICE request; | ||
+ | printf(" | ||
+ | // read request: task request from woker or task completion msg from coll | ||
+ | retcode = read(s,& | ||
+ | if(retcode != sizeof(SERVICE)) { // should be != 0 (read can be split) | ||
+ | perror(" | ||
+ | if(retcode == 0) { // connection broken | ||
+ | printf(" | ||
+ | // you should know which is the last task sent, in such a way it can | ||
+ | // be rescheduled onto a different worker | ||
+ | printf(" | ||
+ | pthread_mutex_lock(& | ||
+ | { | ||
+ | TASK * t = (TASK *) calloc(1, | ||
+ | t->v = task_to_send-> | ||
+ | t->tag = task_to_send-> | ||
+ | t->next = task_limbo; | ||
+ | task_limbo = t; | ||
+ | printf(" | ||
+ | } | ||
+ | pthread_mutex_unlock(& | ||
+ | return (NULL); | ||
+ | } | ||
+ | if(request.tag == TAG_COMPLETED) { // this is from collector | ||
+ | // cancel request.v tag from limbo | ||
+ | TASK * p, *pp ; | ||
+ | p = pp = task_limbo; | ||
+ | pthread_mutex_lock(& | ||
+ | while(p!=NULL) { | ||
+ | if(p-> | ||
+ | if(p == pp) { // first element to be removed | ||
+ | p = p-> | ||
+ | } else { | ||
+ | pp->next = p->next; // TODO: free unused one ! | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | printf(" | ||
+ | pthread_mutex_unlock(& | ||
+ | return(NULL); | ||
+ | } // else: emit a task to the worker and cycle (this is a worker manager) | ||
+ | printf(" | ||
+ | pthread_mutex_lock(& | ||
+ | if(task_list != NULL) { // send a task to the requesting worker | ||
+ | TASK * t = task_list; | ||
+ | task_list = t-> | ||
+ | task_to_send = t; | ||
+ | taskNo++; | ||
+ | pthread_mutex_unlock(& | ||
+ | } else { // if no more tasks, then check limbo or send EOS | ||
+ | pthread_mutex_unlock(& | ||
+ | pthread_mutex_lock(& | ||
+ | if(task_limbo!= NULL) { | ||
+ | task_to_send = task_limbo ; | ||
+ | task_limbo = task_limbo-> | ||
+ | printf(" | ||
+ | } else { | ||
+ | task_to_send = & | ||
+ | } | ||
+ | pthread_mutex_lock(& | ||
+ | } | ||
+ | write(s,& | ||
+ | if(task_to_send != &eos) { | ||
+ | printf(" | ||
+ | } else { | ||
+ | printf(" | ||
+ | close(s); | ||
+ | return((void *)taskNo); | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | |||
+ | #define MAXHOSTNAME 80 | ||
+ | #define MAXTHREADS | ||
+ | |||
+ | int main(int argc, char * argv[]) { | ||
+ | int s,si, retcode, i; | ||
+ | unsigned int salen; | ||
+ | struct sockaddr_in sa,sai; | ||
+ | char hostname[MAXHOSTNAME]; | ||
+ | pthread_t tids[MAXTHREADS]; | ||
+ | int tNo = 0; // thread to allocate | ||
+ | |||
+ | |||
+ | // set up task structure | ||
+ | for(i=0; i< | ||
+ | TASK * t = (TASK *) calloc(1, | ||
+ | t->next = task_list; | ||
+ | t->v = i; | ||
+ | t->tag = i; | ||
+ | task_list = t; | ||
+ | } | ||
+ | // set up eos | ||
+ | eos.v = -1; | ||
+ | eos.tag = -2; | ||
+ | eos.next = NULL; | ||
+ | |||
+ | // code needed to set up the communication infrastructure | ||
+ | printf(" | ||
+ | si = socket(AF_INET, | ||
+ | if(si == -1) {perror(" | ||
+ | sai.sin_family = AF_INET; | ||
+ | sai.sin_port = htons(atoi(argv[1])); | ||
+ | gethostname(hostname, | ||
+ | memcpy(& | ||
+ | sizeof(sa.sin_addr)); | ||
+ | printf(" | ||
+ | retcode = bind(si, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | printf(" | ||
+ | retcode = listen(si, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | |||
+ | while(1==1) { | ||
+ | int * s = (int *) calloc(sizeof(int), | ||
+ | |||
+ | salen = sizeof(sa); | ||
+ | printf(" | ||
+ | *s = accept(si, | ||
+ | if(*s == 1) { | ||
+ | perror(" | ||
+ | // now fork a thread to permanently handle the connection | ||
+ | pthread_create(& | ||
+ | printf(" | ||
+ | tNo++; | ||
+ | } | ||
+ | printf(" | ||
+ | return 0; | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | === Worker code === | ||
+ | <code c worker.c> | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | |||
+ | #include " | ||
+ | |||
+ | /* | ||
+ | | ||
+ | | ||
+ | */ | ||
+ | |||
+ | int f(int task) { | ||
+ | sleep(1); | ||
+ | return (task*task); | ||
+ | } | ||
+ | |||
+ | /* | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | */ | ||
+ | |||
+ | int main(int argc, char * argv[]) { | ||
+ | int s, cs, retcode; | ||
+ | struct sockaddr_in sa; | ||
+ | int pid = 0; | ||
+ | | ||
+ | | ||
+ | pid = getpid(); | ||
+ | printf(" | ||
+ | // connect permanently to the emitter process | ||
+ | s = socket(AF_INET, | ||
+ | if(s == -1) { | ||
+ | perror(" | ||
+ | return (-1); } | ||
+ | printf(" | ||
+ | sa.sin_family = AF_INET; | ||
+ | sa.sin_port = htons(atoi(argv[2])); | ||
+ | memcpy(& | ||
+ | sizeof(sa.sin_addr)); | ||
+ | retcode = connect(s, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | | ||
+ | while(1==1) { | ||
+ | TASK task; | ||
+ | RESULT result; | ||
+ | SERVICE request; | ||
+ | // sends a request to the emitter | ||
+ | request.tag = TAG_TASK_REQUEST; | ||
+ | request.v = pid; | ||
+ | retcode = write(s,& | ||
+ | if(retcode == -1) { perror(" | ||
+ | retcode = read(s,& | ||
+ | if(retcode != sizeof(TASK)) { // this could be corrected in case | ||
+ | // of large tasks (while(len!=sizeof) read) | ||
+ | perror(" | ||
+ | | ||
+ | // check for EOS | ||
+ | if(task.tag < 0) { | ||
+ | printf(" | ||
+ | break; | ||
+ | } else { | ||
+ | printf(" | ||
+ | // otherwise process the incoming task | ||
+ | result.v = f(task.v); | ||
+ | result.tag = task.tag; | ||
+ | result.next = NULL; | ||
+ | printf(" | ||
+ | | ||
+ | // send result to the collector | ||
+ | cs = socket(AF_INET, | ||
+ | sa.sin_family = AF_INET; | ||
+ | sa.sin_port = htons(atoi(argv[4])); | ||
+ | memcpy(& | ||
+ | sizeof(sa.sin_addr)); | ||
+ | retcode = connect(cs, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | return(-1); | ||
+ | | ||
+ | // send the result and close connection | ||
+ | write(cs,& | ||
+ | close(cs); | ||
+ | printf(" | ||
+ | | ||
+ | // then cycle again | ||
+ | } | ||
+ | } | ||
+ | close(s); | ||
+ | return 0; | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | === Collector code === | ||
+ | <code c collector.c> | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | #include < | ||
+ | |||
+ | #include " | ||
+ | |||
+ | // receives results from the workers and displays them on the console | ||
+ | // usage: | ||
+ | // a.out portno emitterhost emitterport | ||
+ | // number of the port for the result socket | ||
+ | // then address of emitter (to send feedbacks on task completion) | ||
+ | |||
+ | #define MAXHOSTNAME 80 | ||
+ | |||
+ | int main(int argc, char * argv[]) { | ||
+ | int s, ei, si, retcode, i; | ||
+ | RESULT result; | ||
+ | unsigned int salen; | ||
+ | struct sockaddr_in sa,sai,eai; | ||
+ | char hostname[MAXHOSTNAME]; | ||
+ | char * emitterhost = argv[2]; | ||
+ | int emitter_port = atoi(argv[3]); | ||
+ | |||
+ | si = socket(AF_INET, | ||
+ | if(si == -1) { | ||
+ | perror(" | ||
+ | sai.sin_family = AF_INET; | ||
+ | sai.sin_port = htons(atoi(argv[1])); | ||
+ | gethostname(hostname, | ||
+ | memcpy(& | ||
+ | sizeof(sai.sin_addr)); | ||
+ | retcode = bind(si, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | retcode = listen(si, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | |||
+ | while(1==1) { | ||
+ | SERVICE feedback; | ||
+ | salen = sizeof(sa); | ||
+ | printf(" | ||
+ | s = accept(si, | ||
+ | if(s == 1) { | ||
+ | perror(" | ||
+ | } | ||
+ | // read a res from one of the Worker | ||
+ | retcode = read(s,& | ||
+ | if(retcode != sizeof(RESULT)) { // AGAIN, should be corrected | ||
+ | perror(" | ||
+ | if(result.tag < 0) { | ||
+ | printf(" | ||
+ | break; | ||
+ | } | ||
+ | printf(" | ||
+ | fflush(stdout); | ||
+ | close(s); | ||
+ | // now send a feedback to emitter process with the tag of the | ||
+ | // received task: in case it was selected for re-scheduling | ||
+ | // it will be removed, otherwise it will be rescheduled after | ||
+ | // completing " | ||
+ | ei = socket(AF_INET, | ||
+ | if(ei == -1) { | ||
+ | perror(" | ||
+ | eai.sin_family = AF_INET; | ||
+ | eai.sin_port = htons(emitter_port); | ||
+ | memcpy(& | ||
+ | sizeof(eai.sin_addr)); | ||
+ | retcode = connect(ei, | ||
+ | if(retcode == -1) { | ||
+ | perror(" | ||
+ | // send feedback on computed task | ||
+ | feedback.tag = TAG_COMPLETED; | ||
+ | feedback.v | ||
+ | retcode = write(ei,& | ||
+ | if(retcode != sizeof(SERVICE)) { | ||
+ | perror(" | ||
+ | close(ei); // one shot: TODO use permanent connection on dedicated ss | ||
+ | } | ||
+ | close(si); | ||
+ | return 0; | ||
+ | } | ||
+ | </ | ||
+ | |||
+ | **Attention**: |
magistraleinformaticanetworking/spm/farmposix.1300815059.txt.gz · Ultima modifica: 22/03/2011 alle 17:30 (14 anni fa) da Marco Danelutto