Strumenti Utente

Strumenti Sito


magistraleinformaticanetworking:spm:ff27_mapfarm

MapFarm

mapfarm.cpp
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License version 2 as 
 *  published by the Free Software Foundation.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *  As a special exception, you may use this file as part of a free software
 *  library without restriction.  Specifically, if other files instantiate
 *  templates or use macros or inline functions from this file, or you compile
 *  this file and link it with other files to produce an executable, this
 *  file does not by itself cause the resulting executable to be covered by
 *  the GNU General Public License.  This exception does not however
 *  invalidate any other reasons why the executable file might be covered by
 *  the GNU General Public License.
 *
 ****************************************************************************
 */
 
/*
 * The program shows how to implement a map working on a vector 
 * with a farm skeleton.
 *
 */
#include <vector>
#include <iostream>
#include <ff/farm.hpp>
 
using namespace ff;
 
template<typename T>
struct task_t {
    task_t(T*v,size_t size,int nw):v(v),size(size),nw(nw) {}
 
    T     *v;
    size_t size;
    int    nw;
};
 
/* --------------------------------------------------------- */
template<typename T>
class Emitter: public ff_node {
public:
    Emitter(T*const V, size_t size, ff_loadbalancer *const lb):V(V),size(size),lb(lb) {}
 
    void *svc(void*) {
        int nw = lb->getnworkers();
        for(int i=0;i<lb->getnworkers();++i) {
            task_t<T> *t = new task_t<T>(V,size,nw);
            lb->ff_send_out_to(t,i);
        }
        return NULL;
    }
private:
    T*const V;
    size_t size;
    ff_loadbalancer *const lb;
};
 
/* --------------------------------------------------------- */
 
template<typename T>
class Worker: public ff_node {
public:
    Worker(void(*F)(T*const,long,long)):F(F) {}
 
    void *svc(void *task) {
        task_t<T> *t = (task_t<T>*)task;
        assert(t->size > t->nw);
        long myid  = get_my_id();
        long q     = t->size / t->nw;
        long r     = t->size % t->nw;
 
        long start = myid*q + ((r>=myid)? myid:r);
        long stop  = start + ((myid<r)?(q+1):q);
 
        printf("worker id=%ld working on partition (%ld,%ld(\n", myid,start,stop);
        F(t->v,start,stop);
 
        return task;
    }
protected:
    void(*F)(T*const,long,long);
};
 
/* --------------------------------------------------------- */
 
class Collector: public ff_node {
public:
    Collector(ff_gatherer *gt):gt(gt) {}
 
    void *svc(void *task) {
        int nw = gt->getnworkers();
        void *V[nw];
        gt->all_gather(task, (void**)&V); 
        return NULL;
    }
private:
    ff_gatherer *const gt;
};
 
/* --------------------------------------------------------- */
 
 
template<typename T>
static void myF(T*const V, long start, long stop) {
    for(long i=start; i<stop; ++i)
        V[i] += 1;
}
 
 
int main(int argc, char *argv[]) {    
    if (argc<3) {
        std::cerr << "use: " 
                  << argv[0] 
                  << " arraysize nworkers\n";
        return -1;
    }
    int arraySize= atoi(argv[1]);
    int nworkers = atoi(argv[2]);
 
    if (nworkers<=0) {
        std::cerr << "Wrong parameters values\n";
        return -1;
    }
 
    // init vector
    long *V = new long[arraySize];
    for(int i=0;i<arraySize;++i) V[i] = i;
 
    // creating the task-farm for one-shot computation
    ff_farm<> farm;
 
    std::vector<ff_node*> w;
    for(int i=0;i<nworkers;++i)
	w.push_back(new Worker<long>(myF));
    farm.add_workers(w);
 
    Emitter<long> E(V,arraySize,farm.getlb());
    farm.add_emitter(&E);
 
    Collector C(farm.getgt());
    farm.add_collector(&C);
 
    if (farm.run_and_wait_end()<0) {
        printf("error executing farm\n");
        return -1;
    }
 
    // printing vector
    printf("\n\n");
    for(int i=0;i<arraySize;++i) printf("%ld ", V[i]);
    printf("\n");
    return 0;
}
magistraleinformaticanetworking/spm/ff27_mapfarm.txt · Ultima modifica: 27/11/2013 alle 17:41 (11 anni fa) da Marco Danelutto