Class work 2

Below you can find the code discussed in class and relative to:

prime_farm.cpp
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License version 2 as 
 *  published by the Free Software Foundation.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *  As a special exception, you may use this file as part of a free software
 *  library without restriction.  Specifically, if other files instantiate
 *  templates or use macros or inline functions from this file, or you compile
 *  this file and link it with other files to produce an executable, this
 *  file does not by itself cause the resulting executable to be covered by
 *  the GNU General Public License.  This exception does not however
 *  invalidate any other reasons why the executable file might be covered by
 *  the GNU General Public License.
 *
 ****************************************************************************
 */
/* 
 * Author: Massimo Torquati <torquati@di.unipi.it> 
 * Date:   November 2014
 */
 
//
// Toy example to test FastFlow farm.
// It finds the prime numbers in the range (n1,n2) using a farm + feedback.
//
//                   |----> Worker --
//         Emitter --|----> Worker --|--
//           ^       |----> Worker --   |
//           |__________________________|
//
//  -   The Emitter generates all number between n1 and n2 and then waits 
//      to receive primes from the Workers.
//  -   The Worker check if the number is prime (by using the is_prime function), 
//      if yes then it sends the prime number back to the Emitter.
//
//  Usage example:
//    ./farm_primes 1900 2000 3 | tr -s ' ' '\n' | sort -n
 
 
#include <vector>
#include <iostream>
#include <ff/farm.hpp>
using namespace ff;
 
// see http://en.wikipedia.org/wiki/Primality_test
bool is_prime(unsigned long long n) {
    if (n <= 3)  return n > 1; // 1 is not prime !
 
    if (n % 2 == 0 || n % 3 == 0) return false;
 
    for (unsigned long long i = 5; i * i <= n; i += 6) {
        if (n % i == 0 || n % (i + 2) == 0) 
            return false;
    }
    return true;
}
struct Emitter: ff_node_t<long> {
    Emitter(ff_loadbalancer *lb, 
            unsigned long long n1, unsigned long long n2)
        :lb(lb),n1(n1),n2(n2) {}
 
    long *svc(long *x) {
        if (x == nullptr) { // first time
            unsigned long long n;
            while( (n=n1++) <= n2 ) ff_send_out((void*)n);
            lb->broadcast_task(EOS);
            return GO_ON;
        }
        std::cout << reinterpret_cast<long>(x) << " ";
        return GO_ON;
    }
    ff_loadbalancer *const lb;
    unsigned long long n1,n2;
};
struct Worker: ff_node_t<long> {
    long *svc(long *in) {
        if (is_prime(reinterpret_cast<long>(in))) return in;
        return GO_ON;
    }
};
 
int main(int argc, char *argv[]) {    
    if (argc<4) {
        std::cerr << "use: " << argv[0]  << " number1 number2 nworkers\n";
        return -1;
    }
    unsigned long long n1 = atoll(argv[1]);
    unsigned long long n2 = atoll(argv[2]);
    const int    nw       = atoi(argv[3]);
 
    std::vector<ff_node*> W;
    for(int i=0;i<nw; ++i) W.push_back(new Worker);
    ff_farm<> farm(W); // by default adds an emitter and a collector
    Emitter E(farm.getlb(), n1,n2);
    farm.add_emitter(&E);    // replacing the default emitter
 
    farm.remove_collector(); // removing the default collector
    farm.wrap_around();      // adds feedback channel between worker and emitter
    farm.run_and_wait_end();
    std::cout << "\n\n";
    return 0;
}
farm_map_primes.cpp
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License version 2 as 
 *  published by the Free Software Foundation.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *  As a special exception, you may use this file as part of a free software
 *  library without restriction.  Specifically, if other files instantiate
 *  templates or use macros or inline functions from this file, or you compile
 *  this file and link it with other files to produce an executable, this
 *  file does not by itself cause the resulting executable to be covered by
 *  the GNU General Public License.  This exception does not however
 *  invalidate any other reasons why the executable file might be covered by
 *  the GNU General Public License.
 *
 ****************************************************************************
 */
/* 
 * Author: Massimo Torquati <torquati@di.unipi.it> 
 * Date:   November 2014
 */
 
// SPM class exercise (Class Work 2)
// Consider the possibility to list all prime numbers in an interval [MIn, MAX] by:
//
//  - split the interval in a number of given subintervals nw (n workers)
//  - assig each subinterval to a farm worker
//  - the worker
//        for each i in the interval
//            checks if i is prime if yes it packs the value in an array
//            finally the array is sent to the gatherer thread
//  - the gatherer threads gets all results and produces an ordered lists
//    of all primes
//                        
//             (primality check in each partition) 
//                         
//                   |----> Worker -->|
//         Emitter --|----> Worker -->|--Collector
//                   |----> Worker -->|   
//   (splits input array          (gathers all sub-partitions)
//    in partitions)
//              
 
#include <vector>
#include <iostream>
#include <ff/farm.hpp>
using namespace ff;
 
// see http://en.wikipedia.org/wiki/Primality_test
bool is_prime(unsigned long long n) {
    if (n <= 3)  return n > 1; // 1 is not prime !
 
    if (n % 2 == 0 || n % 3 == 0) return false;
 
    for (unsigned long long i = 5; i * i <= n; i += 6) {
        if (n % i == 0 || n % (i + 2) == 0) 
            return false;
    }
    return true;
}
 
 
// stream type
struct Task_t {
    Task_t(unsigned long long n1, unsigned long long n2):n1(n1),n2(n2) {}
    const unsigned long long n1, n2;   // input range
    std::vector<unsigned long long> V; // output data
};
 
// splits the range [n1,n2] in nw sub-ranges each one of size ~(n2-n1)/nw 
struct Emitter: ff_node_t<Task_t> {
    Emitter(const ff_loadbalancer *lb,
            unsigned long long n1, unsigned long long n2) 
        :lb(lb),n1(n1),n2(n2) {}
 
    Task_t *svc(Task_t *) {
        const int nw = lb->getNWorkers();  // gets the total number of workers added to the farm
        const size_t  size = (n2 - n1) / nw;
        ssize_t more = (n2-n1) % nw;
        unsigned long long start = n1;
        unsigned long long stop  = n1;
 
        for(int i=0; i<nw; ++i) {
            start = stop;
            stop  = start + size + (more>0 ? 1:0);
            --more;
 
            Task_t *task = new Task_t(start, stop);
            lb->ff_send_out_to(task, i);
        }
        return EOS; // broadcast EOS to all workers
    }
    const ff_loadbalancer *lb;
    unsigned long long n1, n2;
};
struct Worker: ff_node_t<Task_t> {
    Task_t *svc(Task_t *in) {
        auto   &V   = in->V;
        unsigned long long n, n1(in->n1), n2(in->n2);
        while( (n=n1++) < n2 )  if (is_prime(n)) V.push_back(n);
        return in;
    }
};
struct Collector: ff_node_t<Task_t> {
    Task_t *svc(Task_t *in) {
        auto V = in->V;
 
        if (V.size())  // I may receive empty sub-partitions
            primes.insert(std::upper_bound(primes.begin(), primes.end(), V[0]),
                          V.begin(), V.end());
        delete in;
        return GO_ON;
    }
    void svc_end() { 
        for(size_t i=0;i<primes.size(); ++i) 
            std::cout << primes[i] << " ";
        std::cout << "\n";
    }
    std::vector<unsigned long long> primes;
};
 
int main(int argc, char *argv[]) {    
    if (argc<4) {
        std::cerr << "use: " << argv[0]  << " number1 number2 nworkers\n";
        return -1;
    }
    unsigned long long n1 = atoll(argv[1]);
    unsigned long long n2 = atoll(argv[2]);
    const int    nw       = atoi(argv[3]);
 
    std::vector<ff_node*> W;
    for(int i=0;i<nw; ++i) W.push_back(new Worker);
    Collector C;
    ff_farm<> farm(W, nullptr, &C);
    Emitter E(farm.getlb(), n1,n2); 
    farm.add_emitter(&E);  // replacing default emitter
    farm.cleanup_workers();
    if (farm.run_and_wait_end()<0) error("running farm\n");
    return 0;
}
pipe_primes.cpp
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License version 2 as 
 *  published by the Free Software Foundation.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *  As a special exception, you may use this file as part of a free software
 *  library without restriction.  Specifically, if other files instantiate
 *  templates or use macros or inline functions from this file, or you compile
 *  this file and link it with other files to produce an executable, this
 *  file does not by itself cause the resulting executable to be covered by
 *  the GNU General Public License.  This exception does not however
 *  invalidate any other reasons why the executable file might be covered by
 *  the GNU General Public License.
 *
 ****************************************************************************
 */
/* 
 * Author: Massimo Torquati <torquati@di.unipi.it> 
 * Date:   November 2014
 */
//
// Toy example to test FastFlow pipeline. 
// It finds the first n prime numbers using a pipeline of n stages.
// Please remember that in FastFlow pipeline stages are threads !
//  Example:
//    pipe_primes 16
//
//     generator  ---> stage(2) ---> stage(3) ---> stage(4)
//        ^                                           |
//        |___________________________________________|
//
//  -   The number of stages is floor(sqrt(n)) plus the generator.
//  -   stage(i) sends out only those numbers x such that (x != i and (x % i) != 0)
//  -   generators first generates all numbers, then prints all primes received
//                  in input from the last stage.
//
 
#include <cmath>
#include <iostream>
#include <vector>
#include <ff/pipeline.hpp>
using namespace ff;
 
// stream generator
struct generator: ff_node {
    generator(long n):n(n) {}
    void *svc(void *x) {
        if (x == nullptr) {   // first time 
            for(long x=2;x<=n;++x) ff_send_out((void*)x);
            ff_send_out(EOS);
            return GO_ON;
        }
        std::cout << reinterpret_cast<long>(x) << " ";
        return GO_ON;
    }
    long n;
};
// generic stage
struct stage: ff_node_t<long> {    
    int svc_init() { myid = get_my_id() + 1 ; return 0; }
    long *svc(long *t) {
        long x = (long)t;
        if (x == myid) return t;
        if (x % myid)  return t;
        return GO_ON;
    }
    long myid;
};
 
int main(int argc, char *argv[]) {    
    if (argc<2) {
        std::cerr << "use: " << argv[0]  << " number (not too big !)\n";
        return -1;
    }
    long n = atol(argv[1]);
    assert(n>1);
 
    // The number of stages (i.e. threads) is floor(sqrt(n))+1 ||
 
    ff_pipe<long> pipe(new generator(n));
    long sq = sqrt(n);
    for(long i=2;i<=sq;++i) pipe.add_stage(new stage);
    pipe.cleanup_nodes();
    pipe.wrap_around();
    pipe.run_and_wait_end();
    std::cout << "\n\n";
    return 0;
}
primes_pipe_discard.cpp
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License version 2 as 
 *  published by the Free Software Foundation.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, write to the Free Software
 *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
 *
 *  As a special exception, you may use this file as part of a free software
 *  library without restriction.  Specifically, if other files instantiate
 *  templates or use macros or inline functions from this file, or you compile
 *  this file and link it with other files to produce an executable, this
 *  file does not by itself cause the resulting executable to be covered by
 *  the GNU General Public License.  This exception does not however
 *  invalidate any other reasons why the executable file might be covered by
 *  the GNU General Public License.
 *
 ****************************************************************************
 */
/* 
 * Author: Massimo Torquati <torquati@di.unipi.it> 
 * Date:   November 2014
 */ 
 
// SPM class exercise (Class Work 2)
// Set up a FastFlow pipeline filtering a stream of integers from 2 to MAX, 
// (MAX is a command line argument) such that:
//   - the first stage generates integers from 2 to MAX
//   - the i-th stage:
//        - if it does not have any number stored, stores the first number received 
//          and does not forward it to the next stage
//        - if it has a number stored:
//            - passes the received number only if it is not a multiple of the stored integer
//              otherwise discards the received number
//   - when EOS is received, each stage prints its stored number (a prime number)
//
//
//     generator  ---> stage(2) ---> stage(3) ---> stage(5)... -->discard
//    
//    
 
 
 
#include <cmath>
#include <iostream>
#include <vector>
#include <ff/pipeline.hpp>
using namespace ff;
 
// stream generator
struct generator: ff_node_t<long> {
    generator(long n):n(n) {}
    long *svc(long *) {
        for(long x=2;x<=n;++x) 
            ff_send_out(reinterpret_cast<long*>(x));
        return EOS;
    }
    long n;
};
 
// generic stage
struct stage: ff_node_t<long> {
    int svc_init() { storage = 0 ; return 0; }
    long *svc(long *t) {
        long x = (long)t;
        if (!storage) storage = x;
        if (x % storage)  return t;
        return GO_ON;
    }
    void svc_end() { 
        if (storage)
            std::cout << storage << " ";
    }
    long storage;
};
 
// task discarder
struct discarder: ff_node {
    void *svc(void *t) { return GO_ON; }
};
 
int main(int argc, char *argv[]) {    
    if (argc<2) {
        std::cerr << "use: " << argv[0]  << " number (not too big !)\n";
        return -1;
    }
    long n = atol(argv[1]);
    assert(n>1);
 
    ff_pipe<long> pipe(new generator(n)); // adding the first stage
    for(long i=2;i<=n;++i) pipe.add_stage(new stage); // adding all "middle" stages
    pipe.add_stage(new discarder);  // adding the last stage
    pipe.cleanup_nodes();
    if (pipe.run_and_wait_end()<0) error("running pipe\n");
    std::cout << "\n\n";
    return 0;
}