magistraleinformaticanetworking:spm:spm1213_c_ff_mixskepumap
- mixskepuffmap.cpp
#include <iostream>
#include <ff/pipeline.hpp>
#include <ff/farm.hpp>
#include <math.h>
#include <skepu/vector.h>
#include <skepu/map.h>
using namespace ff;
using namespace std;
int N = 10;
class Source: public ff_node {
public:
Source(unsigned int streamlen):streamlen(streamlen) {}
void * svc(void * task) {
if(streamlen != 0) {
skepu::Vector<float> * v = new skepu::Vector<float>(N,(float)streamlen);
streamlen--;
task = (void *) v;
#ifdef DEBUG
std::cout << "Source delivering:" << *v << std::endl;
#endif
} else {
task = NULL;
}
return task;
}
private:
unsigned int streamlen;
};
class Drain: public ff_node {
void * svc(void * task) {
skepu::Vector<float> * v = (skepu::Vector<float> *) task;
#ifdef DEBUG
std::cout << "Drain got " << *v << std::endl;
#endif
return(GO_ON);
}
};
#define ITERNO 800000
UNARY_FUNC(iters, float, a,
for(int _i=0; _i<ITERNO; _i++) a = sin(a); return(a);
)
float f_iters(float a) {
for(int _i=0; _i<ITERNO; _i++) a = sin(a); return(a);
}
// UNARY_FUNC(f, float, a, return(a+1); )
class MapStage:public ff_node {
private:
int tasks;
int wno;
public:
MapStage(int wno):tasks(0),wno(wno) {}
void svc_end() {
cout << "SKEPU map stage " << wno << " computed " << tasks << " tasks " << e
ndl;
}
void *svc(void * task) {
tasks++;
skepu::Vector<float> * v = (skepu::Vector<float> *) task;
#ifdef DEBUG
std::cout << "MapStage got: " << *v << std::endl;
#endif
skepu::Vector<float> * r = new skepu::Vector<float>(v->size());
skepu::Map<iters> skepumap(new iters);
skepumap(*v, *r);
#ifdef DEBUG
std::cout << "MapStage delivering: " << *r << std::endl;
#endif
return((void *) r);
}
};
class SeqMapStage : public ff_node {
private:
int tasks;
int wno;
public:
SeqMapStage(int wno):tasks(0),wno(wno) {}
void svc_end() {
cout << "CPU worker " << wno << " computed " << tasks << " tasks " << endl;
}
void * svc(void * task) {
tasks++;
skepu::Vector<float> * v = (skepu::Vector<float> *) task;
skepu::Vector<float> * r = new skepu::Vector<float>(v->size());
for(int i=0; i<v->size(); i++)
(*r)[i] = f_iters((*v)[i]);
return((void *) r);
}
};
class Emitter : public ff_node {
void * svc (void * t) { return t; }
};
class Collector : public ff_node {
void * svc (void * t) { return t; }
};
int main(int argc, char * argv[]) {
if (argc==1) {
std::cerr << "use: " << argv[0] << " streamlen veclen nw [skepuworkers]\n";
return -1;
}
int m = atoi(argv[1]);
N = atoi(argv[2]);
int nw = atoi(argv[3]);
int skepuworker;
if(argc>4)
skepuworker=atoi(argv[4]);
else
skepuworker = 0;
cout << "Using " << nw << " ff workers and " << skepuworker << " Skepu workers
" << endl;
// bild a 2-stage pipeline
ff_pipeline pipe;
pipe.add_stage(new Source(m));
// pipe.add_stage(new MapStage());
ff_farm<> farm;
std::vector<ff_node *> w;
if(skepuworker!=0)
for(int i=0; i<skepuworker; i++)
w.push_back(new MapStage(i)); // one stage on SKEPU
for(int i=0; i<nw; i++)
w.push_back(new SeqMapStage(i));
farm.set_scheduling_ondemand();
farm.add_workers(w);
farm.add_collector(new Collector());
farm.add_emitter(new Emitter());
pipe.add_stage(&farm);
pipe.add_stage(new Drain());
ffTime(START_TIME);
if (pipe.run_and_wait_end()<0) {
error("running pipeline\n");
return -1;
}
ffTime(STOP_TIME);
std::cerr << "DONE, pipe time= " << pipe.ffTime() << " (ms)\n";
std::cerr << "DONE, total time= " << ffTime(GET_TIME) << " (ms)\n";
pipe.ffStats(std::cerr);
return 0;
}
magistraleinformaticanetworking/spm/spm1213_c_ff_mixskepumap.txt · Ultima modifica: 30/04/2013 alle 16:57 (10 anni fa) da Marco Danelutto