map.cpp
#include <pthread.h>
 
#include <vector>
#include <iostream>
#include <ff/farm.hpp>
 
using namespace ff;
 
// a task requires to compute the matrix multiply C = A x B 
// we assume square matrixes, for the sake of simplicity
typedef struct {
  int n;
  float **a;
  float **b;
  float **c;
} TASK;
 
// a subtask is the computation of the inner product or A, row i, by B, col j
typedef struct {
  int i,j; 
  TASK * t;
} SUBTASK; 
 
// a partial result is the i,j item in the result matrix
typedef struct {
  int i,j; 
  float x; 
  TASK * t;
} PART_RESULT;
 
// this node is used to generate the task list out of the initial data
// kind of user defined iterator over tasks
class Split: public  ff_node {
   void * svc(void * t) {
     TASK * task = (TASK *) t;     // tasks come in already allocated
     for(int i=0; i<task->n; i++) 
       for(int j=0; j< task->n; j++)  {
	 // SUBTASKe are allocated in the splitter and destroyed in the worker
         SUBTASK * st = (SUBTASK *) calloc(1,sizeof(SUBTASK));
	 st->i = i; 
         st->j = j; 
         st->t = task;
         ff_send_out((void *)st);
       }
     return GO_ON; 
   }
};
 
// this node is used to consolidate the subresults into the result matrix
class Compose: public ff_node {
   void * svc(void * t) {
     PART_RESULT * r = (PART_RESULT *) t;
     ((r->t)->c)[r->i][r->j] = r->x; 
     // deallocate the partial result got on the input stream
     free(t);
     return GO_ON;
   }
};
 
 
// this is the node actually computing the task (IP)
class Worker: public ff_node {
public:
 
    void * svc(void * task) {
	SUBTASK * t = (SUBTASK *) task;
        float * x = new float(0.0);
	for(int k=0; k<(t->t)->n; k++) {
	  *x = *x + (t->t->a)[t->i][k] * (t->t->b)[k][t->j];
        }
        // prepare the partial result to be delivered
    	PART_RESULT * pr = (PART_RESULT *) calloc(1,sizeof(PART_RESULT));
        pr->i = t->i;
        pr->j = t->j;
        pr->t = t->t;
        pr->x = *x;
        // the subtask is no more useful, deallocate it
        free(task);
        // return the partial result
        return pr;
    }
};
 
int main(int argc, char * argv[]) {
 
    if (argc<3) {
        std::cerr << "use: " 
                  << argv[0] 
                  << " nworkers n\n";
        return -1;
    }
 
    int nworkers=atoi(argv[1]);
    int n=atoi(argv[2]);
 
    // this is the map setup code ----------------------------------------------
    ff_farm<> farm(true);
 
    farm.add_emitter(new Split());	// add the splitter emitter
    farm.add_collector(new Compose());  // add the composer collector
    std::vector<ff_node *> w;           // add the convenient # of workers
    for(int i=0;i<nworkers;++i) 
	w.push_back(new Worker);
    farm.add_workers(w); 
    farm.run_then_freeze();             // run it as an accelerator
    // end of map setup --------------------------------------------------------
 
    // now we can test it by offloading a task
    srand(getpid());
    float ** A = new float*[n];
    for(int i=0; i<n; i++) 
      A[i] = new float[n]; 
    float ** B = new float*[n];
    for(int i=0; i<n; i++) 
      B[i] = new float[n]; 
    float ** C = new float*[n];
    for(int i=0; i<n; i++) 
      C[i] = new float[n]; 
 
    std::cout << "A" << std::endl;
    for(int i=0; i<n; i++) {
      for(int j=0; j<n; j++) {
        A[i][j] = rand() % 10 - 5;
        std::cout << A[i][j] << " " ;
      }
    std::cout << std::endl;
    }
 
    std::cout << "B" << std::endl;
    for(int i=0; i<n; i++) {
      for(int j=0; j<n; j++) {
        B[i][j] = rand() % 10 - 5;
        std::cout << B[i][j] << " " ;
      }
    std::cout << std::endl;
    }
 
    std::cout << "C" << std::endl;
    for(int i=0; i<n; i++) {
      for(int j=0; j<n; j++) {
        C[i][j] = rand() % 10 - 5;
        std::cout << C[i][j] << " " ;
      }
    std::cout << std::endl;
    }
 
    TASK * t1 = (TASK *) calloc(1,sizeof(TASK));
    t1->a = A;
    t1->b = B; 
    t1->c = C; 
    t1->n = n; 
 
    farm.offload(t1);
    farm.offload((void *) FF_EOS);
    farm.wait();
 
 
    std::cerr << "DONE, time= " << farm.ffTime() << " (ms)\n";
    farm.ffStats(std::cerr);
 
    float C1[n][n];
    for(int i=0; i<n; i++) {
      for(int j=0; j<n; j++) {
        C1[i][j] = 0.0;
        for(int k=0; k<n; k++) 
          C1[i][j] += A[i][k]*B[k][j];
      }
    }
 
    for(int i=0; i<n; i++) {
      for(int j=0; j<n; j++) {
        std::cout << C[i][j] << " " ;
      }
    std::cout << std::endl;
    }
 
    return 0;
}