#include #include #include #include using namespace ff; // a task requires to compute the matrix multiply C = A x B // we assume square matrixes, for the sake of simplicity typedef struct { int n; float **a; float **b; float **c; } TASK; // a subtask is the computation of the inner product or A, row i, by B, col j typedef struct { int i,j; TASK * t; } SUBTASK; // a partial result is the i,j item in the result matrix typedef struct { int i,j; float x; TASK * t; } PART_RESULT; // this node is used to generate the task list out of the initial data // kind of user defined iterator over tasks class Split: public ff_node { void * svc(void * t) { TASK * task = (TASK *) t; // tasks come in already allocated for(int i=0; in; i++) for(int j=0; j< task->n; j++) { // SUBTASKe are allocated in the splitter and destroyed in the worker SUBTASK * st = (SUBTASK *) calloc(1,sizeof(SUBTASK)); st->i = i; st->j = j; st->t = task; ff_send_out((void *)st); } return GO_ON; } }; // this node is used to consolidate the subresults into the result matrix class Compose: public ff_node { void * svc(void * t) { PART_RESULT * r = (PART_RESULT *) t; ((r->t)->c)[r->i][r->j] = r->x; // deallocate the partial result got on the input stream free(t); return GO_ON; } }; // this is the node actually computing the task (IP) class Worker: public ff_node { public: void * svc(void * task) { SUBTASK * t = (SUBTASK *) task; float * x = new float(0.0); for(int k=0; k<(t->t)->n; k++) { *x = *x + (t->t->a)[t->i][k] * (t->t->b)[k][t->j]; } // prepare the partial result to be delivered PART_RESULT * pr = (PART_RESULT *) calloc(1,sizeof(PART_RESULT)); pr->i = t->i; pr->j = t->j; pr->t = t->t; pr->x = *x; // the subtask is no more useful, deallocate it free(task); // return the partial result return pr; } }; int main(int argc, char * argv[]) { if (argc<3) { std::cerr << "use: " << argv[0] << " nworkers n\n"; return -1; } int nworkers=atoi(argv[1]); int n=atoi(argv[2]); // this is the map setup code ---------------------------------------------- ff_farm<> farm(true); farm.add_emitter(new Split()); // add the splitter emitter farm.add_collector(new Compose()); // add the composer collector std::vector w; // add the convenient # of workers for(int i=0;ia = A; t1->b = B; t1->c = C; t1->n = n; farm.offload(t1); farm.offload((void *) FF_EOS); farm.wait(); std::cerr << "DONE, time= " << farm.ffTime() << " (ms)\n"; farm.ffStats(std::cerr); float C1[n][n]; for(int i=0; i