#include #include #include #include #include #include #include #include /* * function to be computed on the single task * this is actually the only "business logic" code */ int f(int task) { sleep(1); // just to simulate some work ... return (task*task); } /* * receives integer tasks and delivers integer results, up to the reception * of an EOS (a -1) * this version opens/closes the sockets at each task, which is not * definitely efficient ... * * usage: * a.out emitter_address emitter_port collector_address collector_port */ int main(int argc, char * argv[]) { int s, cs, retcode; // task socket, result socket, error ret struct sockaddr_in sa; int pid = 0; pid = getpid(); // get my POSIX process id printf("This is worker %d\n",pid); // connect permanently to the emitter process s = socket(AF_INET,SOCK_STREAM,0); // socket to access emitter if(s == -1) { perror("while creating the socket to emitter"); return (-1); } printf("Opened E socket %d\n",s); sa.sin_family = AF_INET; sa.sin_port = htons(atoi(argv[2])); memcpy(&sa.sin_addr, (gethostbyname(argv[1])->h_addr), sizeof(sa.sin_addr)); retcode = connect(s,(struct sockaddr *)&sa,sizeof(sa)); if(retcode == -1) { perror("while connecting to emitter"); return(-1);} while(1==1) { int task, result; // sends a request to the emitter printf("Writing to E socket %d\n",s); retcode = write(s,&pid,sizeof(pid)); if(retcode == -1) { perror("sending a request"); return -1; } printf("Reading from E socket %d\n",s); retcode = read(s,&task,sizeof(task)); // get a task if(retcode != sizeof(task)) { perror("while receiving a task"); return -1; } printf("Read from E socket %d\n",s); // check for EOS if(task < 0) { printf("Received EOS\n"); break; // if EOS terminate loop iteratons } else { printf("Received task %d\n",task); // otherwise process the incoming task result = f(task); printf("Computed %d -> %d\n",task,result); // send result to the collector cs = socket(AF_INET,SOCK_STREAM,0); // create socket to collector sa.sin_family = AF_INET; sa.sin_port = htons(atoi(argv[4])); memcpy(&sa.sin_addr, (gethostbyname(argv[3])->h_addr), sizeof(sa.sin_addr)); retcode = connect(cs,(struct sockaddr *)&sa,sizeof(sa)); if(retcode == -1) { perror("connecting to the collector"); return(-1);} // send the result and close connection write(cs,&result,sizeof(result)); close(cs); printf("Sent to C socket, E sock %d\n",s); // then cycle again } } close(s); return 0; }