#include #include #include #include #include #include #include #include #include #include // on demand scheduling of tasks to workers // receive request from worker i -> send task to compute to worker i // when no more tasks are available, send an EOS termination // // usage is: // a.out portno // number of the port used to get worker task requests // // this is the version checking worker failure (emitter side) // and just signaling a worker failure // // data structure taking care of the tasks to compute. We need to push // back tasks that have not presumably been computed due to failures // // to manage task list pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // moved outside main: global int task = 0; // tasks are positive integeres, in this case int tasklimit = 100; // we'll send tasklimit tasks before stopping int eos = -1; // special task to denote End Of Stream void * worker_handler(void * vs) { int s = *((int *) vs); int retcode,i; int task_to_send; int * taskNo = (int *) calloc(sizeof(int),1); // host tasks computed // start reading tasks printf("worker_handler %ld started\n",pthread_self()); while(1==1) { printf("Waiting for a request \n"); retcode = read(s,&i,sizeof(int)); // read request from worker if(retcode != sizeof(int)) { // should be != 0 (read can be split) perror("while reading task from worker"); fflush(stderr); printf("NEED TO RUN A SUBSTITUTE WORKER AFTER THIS DIED !!! \n"); // you should know which is the last task sent, in such a way it can // be rescheduled onto a different worker printf("TASK PRESUMABLY NOT COMPLETED is TASK <%d>\n",task_to_send); return (NULL); } printf("%d ",i); fflush(stdout); pthread_mutex_lock(&mutex); if(task < tasklimit) { // send a task to the requesting worker task++; // next task to be sent task_to_send = task; taskNo++; } else { // if no more tasks, then send an EOS task_to_send = eos; } pthread_mutex_unlock(&mutex); write(s,&task_to_send,sizeof(task_to_send)); // send either task or eos if(task_to_send != eos) { printf("sent task %d to worker %d\n",task,i); } else { printf("Send EOS to worker %d\n",i); close(s); return((void *)taskNo); } } } #define MAXHOSTNAME 80 #define MAXTHREADS 16 int main(int argc, char * argv[]) { int s,si, retcode, i; unsigned int salen; struct sockaddr_in sa,sai; char hostname[MAXHOSTNAME]; pthread_t tids[MAXTHREADS]; // thread handlers int tNo = 0; // thread to allocate // code needed to set up the communication infrastructure printf("Declaring socket\n"); si = socket(AF_INET,SOCK_STREAM,0); // socket for inputs if(si == -1) {perror("opening socket for input"); return -1;} sai.sin_family = AF_INET; sai.sin_port = htons(atoi(argv[1])); gethostname(hostname,MAXHOSTNAME); memcpy(&sai.sin_addr, (gethostbyname(hostname)->h_addr), sizeof(sa.sin_addr)); printf("Binding to %s\n",inet_ntoa(sai.sin_addr)); retcode = bind(si,(struct sockaddr *) & sai, sizeof(sai)); if(retcode == -1) { perror("while calling bind"); return -1; } printf("Listening socket\n"); retcode = listen(si,1); if(retcode == -1) { perror("while calling listen"); return -1; } while(1==1) { int * s = (int *) calloc(sizeof(int),1); salen = sizeof(sa); printf("Accepting connections .... \n"); *s = accept(si,(struct sockaddr *)&sa,&salen); // accept a connection if(*s == 1) { perror("while calling an accept"); return -1; } // now fork a thread to permanently handle the connection pthread_create(&tids[tNo],NULL,worker_handler,(void *) s); printf("Created worker_handle No %d\n",tNo); tNo++; } printf("Closing operations\n"); return 0; }