magistraleinformaticanetworking:spm:spm1213_c_ff_map1
- map.cpp
#include <pthread.h>
#include <vector>
#include <iostream>
#include <ff/farm.hpp>
using namespace ff;
// a task requires to compute the matrix multiply C = A x B
// we assume square matrixes, for the sake of simplicity
typedef struct {
int n;
float **a;
float **b;
float **c;
} TASK;
// a subtask is the computation of the inner product or A, row i, by B, col j
typedef struct {
int i,j;
TASK * t;
} SUBTASK;
// a partial result is the i,j item in the result matrix
typedef struct {
int i,j;
float x;
TASK * t;
} PART_RESULT;
// this node is used to generate the task list out of the initial data
// kind of user defined iterator over tasks
class Split: public ff_node {
void * svc(void * t) {
TASK * task = (TASK *) t; // tasks come in already allocated
for(int i=0; i<task->n; i++)
for(int j=0; j< task->n; j++) {
// SUBTASKe are allocated in the splitter and destroyed in the worker
SUBTASK * st = (SUBTASK *) calloc(1,sizeof(SUBTASK));
st->i = i;
st->j = j;
st->t = task;
ff_send_out((void *)st);
}
return GO_ON;
}
};
// this node is used to consolidate the subresults into the result matrix
class Compose: public ff_node {
void * svc(void * t) {
PART_RESULT * r = (PART_RESULT *) t;
((r->t)->c)[r->i][r->j] = r->x;
// deallocate the partial result got on the input stream
free(t);
return GO_ON;
}
};
// this is the node actually computing the task (IP)
class Worker: public ff_node {
public:
void * svc(void * task) {
SUBTASK * t = (SUBTASK *) task;
float * x = new float(0.0);
for(int k=0; k<(t->t)->n; k++) {
*x = *x + (t->t->a)[t->i][k] * (t->t->b)[k][t->j];
}
// prepare the partial result to be delivered
PART_RESULT * pr = (PART_RESULT *) calloc(1,sizeof(PART_RESULT));
pr->i = t->i;
pr->j = t->j;
pr->t = t->t;
pr->x = *x;
// the subtask is no more useful, deallocate it
free(task);
// return the partial result
return pr;
}
};
int main(int argc, char * argv[]) {
if (argc<3) {
std::cerr << "use: "
<< argv[0]
<< " nworkers n\n";
return -1;
}
int nworkers=atoi(argv[1]);
int n=atoi(argv[2]);
// this is the map setup code ----------------------------------------------
ff_farm<> farm(true);
farm.add_emitter(new Split()); // add the splitter emitter
farm.add_collector(new Compose()); // add the composer collector
std::vector<ff_node *> w; // add the convenient # of workers
for(int i=0;i<nworkers;++i)
w.push_back(new Worker);
farm.add_workers(w);
farm.run_then_freeze(); // run it as an accelerator
// end of map setup --------------------------------------------------------
// now we can test it by offloading a task
srand(getpid());
float ** A = new float*[n];
for(int i=0; i<n; i++)
A[i] = new float[n];
float ** B = new float*[n];
for(int i=0; i<n; i++)
B[i] = new float[n];
float ** C = new float*[n];
for(int i=0; i<n; i++)
C[i] = new float[n];
std::cout << "A" << std::endl;
for(int i=0; i<n; i++) {
for(int j=0; j<n; j++) {
A[i][j] = rand() % 10 - 5;
std::cout << A[i][j] << " " ;
}
std::cout << std::endl;
}
std::cout << "B" << std::endl;
for(int i=0; i<n; i++) {
for(int j=0; j<n; j++) {
B[i][j] = rand() % 10 - 5;
std::cout << B[i][j] << " " ;
}
std::cout << std::endl;
}
std::cout << "C" << std::endl;
for(int i=0; i<n; i++) {
for(int j=0; j<n; j++) {
C[i][j] = rand() % 10 - 5;
std::cout << C[i][j] << " " ;
}
std::cout << std::endl;
}
TASK * t1 = (TASK *) calloc(1,sizeof(TASK));
t1->a = A;
t1->b = B;
t1->c = C;
t1->n = n;
farm.offload(t1);
farm.offload((void *) FF_EOS);
farm.wait();
std::cerr << "DONE, time= " << farm.ffTime() << " (ms)\n";
farm.ffStats(std::cerr);
float C1[n][n];
for(int i=0; i<n; i++) {
for(int j=0; j<n; j++) {
C1[i][j] = 0.0;
for(int k=0; k<n; k++)
C1[i][j] += A[i][k]*B[k][j];
}
}
for(int i=0; i<n; i++) {
for(int j=0; j<n; j++) {
std::cout << C[i][j] << " " ;
}
std::cout << std::endl;
}
return 0;
}
magistraleinformaticanetworking/spm/spm1213_c_ff_map1.txt · Ultima modifica: 11/04/2013 alle 15:04 (10 anni fa) da Marco Danelutto