#include #include #include #include #include #include #include #include #include "task.h" /* * function to be computed on the single task * this is actually the only "business logic" code */ int f(int task) { sleep(1); // just to simulate some work ... return (task*task); } /* * receives integer tasks and delivers integer results, up to the reception * of an EOS (a -1) * this version opens/closes the sockets at each task, which is not * definitely efficient ... * * usage: * a.out emitter_address emitter_port collector_address collector_port */ int main(int argc, char * argv[]) { int s, cs, retcode; // task socket, result socket, error ret struct sockaddr_in sa; int pid = 0; pid = getpid(); // get my POSIX process id printf("This is worker %d\n",pid); // connect permanently to the emitter process s = socket(AF_INET,SOCK_STREAM,0); // socket to access emitter if(s == -1) { perror("while creating the socket to emitter"); return (-1); } printf("Opened E socket %d\n",s); sa.sin_family = AF_INET; sa.sin_port = htons(atoi(argv[2])); memcpy(&sa.sin_addr, (gethostbyname(argv[1])->h_addr), sizeof(sa.sin_addr)); retcode = connect(s,(struct sockaddr *)&sa,sizeof(sa)); if(retcode == -1) { perror("while connecting to emitter"); return(-1);} while(1==1) { TASK task; RESULT result; SERVICE request; // sends a request to the emitter request.tag = TAG_TASK_REQUEST; request.v = pid; retcode = write(s,&request,sizeof(SERVICE)); if(retcode == -1) { perror("sending a request"); return -1; } retcode = read(s,&task,sizeof(TASK)); // get a task if(retcode != sizeof(TASK)) { // this could be corrected in case // of large tasks (while(len!=sizeof) read) perror("while receiving a task"); return -1; } // check for EOS if(task.tag < 0) { printf("Received EOS\n"); break; // if EOS terminate loop iteratons } else { printf("Received task %d\n",task.tag); // otherwise process the incoming task result.v = f(task.v); result.tag = task.tag; result.next = NULL; printf("Computed %d -> %d\n",task.v,result.v); // send result to the collector cs = socket(AF_INET,SOCK_STREAM,0); // create socket to collector sa.sin_family = AF_INET; sa.sin_port = htons(atoi(argv[4])); memcpy(&sa.sin_addr, (gethostbyname(argv[3])->h_addr), sizeof(sa.sin_addr)); retcode = connect(cs,(struct sockaddr *)&sa,sizeof(sa)); if(retcode == -1) { perror("connecting to the collector"); return(-1);} // send the result and close connection write(cs,&result,sizeof(RESULT)); close(cs); printf("Sent to C socket, E sock %d\n",s); // then cycle again } } close(s); return 0; }