Strumenti Utente

Strumenti Sito


magistraleinformaticanetworking:spm:spm14exe1sol

Pipeline with Pthreads, sample code

Sample code relative to

  • implementation of an unbounded channel with standard C++ and Pthread mechanisms
  • implementation of the main forking the three stages and computing the pipeline
  • implementation of a bounded channel
  • implementation of the main using the bounded channel
  • sample makefile
"Channel.hpp"
#include <iostream>
#include <queue>
 
 
template<class Task> class Channel {
 
private:
  std::queue<Task> chan;
  pthread_mutex_t mutex;
  pthread_cond_t cond;
 
public:
  Channel() {
    mutex = PTHREAD_MUTEX_INITIALIZER;
    cond  = PTHREAD_COND_INITIALIZER;
  }
 
 
  void send(Task t) {
    pthread_mutex_lock(&mutex);
    chan.push(t);
    pthread_cond_signal(&cond);    // for waiting receives
    pthread_mutex_unlock(&mutex);
    return;
  }
 
  Task receive() {
    pthread_mutex_lock(&mutex);
    while(chan.empty()) {
      pthread_cond_wait(&cond, &mutex);
    }
    Task t = chan.front();
    chan.pop();
    pthread_mutex_unlock(&mutex);
    return(t);
  }
};
"pipe.cpp"
#include <iostream>
#include <queue>
#include <stdlib.h>
#include <unistd.h>
 
using namespace std;
 
#include "Channel.hpp"
 
typedef enum {EOS, TASK} Task_t;
 
class Task {
private:
  float x;
  Task_t tag;
public:
 
  Task(float x, Task_t tag):x(x),tag(tag) {}
  Task(Task_t tag):tag(tag)  {}
 
  bool isEos() {
    return(tag == EOS);
  }
 
  float get() {
    return(x);
  }
 
  void set(float y) {
    x = y;
    return;
  }
 
};
 
typedef struct __iochans {
  Channel<Task> *in;
  Channel<Task> *out;
} IO_Channels;
 
int ntasks = 0;
 
void * source(IO_Channels * chans) {
  // produce as many tasks as required
  for(int i=0; i<ntasks; i++) {
    Task t(((float) i), TASK);
    (chans->out)->send(t);
#ifdef TRACEMSG
    cout << "Source: output task " << i << endl;
#endif
  }
  cout << "Source: sending EOS" << endl;
  (chans->out)->send(*(new Task(EOS)));
#ifdef TRACEMSG
  cout << "Source: EOS Sent\n terminating ... " << endl;
#endif
  return(NULL);
}
 
void * square(IO_Channels * chans) {
  Task t = (chans->in)->receive();
  while(!t.isEos()) {
#ifdef TRACEMSG
    cout << "Stage1 received task " << t.get() << endl;
#endif
    float x = t.get();
    t.set(x*x);
    (chans->out)->send(t);
    t = (chans->in)->receive();
  }
#ifdef TRACEMSG
  cout << "Square: propagating EOS" << endl;
#endif
  (chans->out)->send(t); //eos
  return(NULL);
}
 
void * drain(IO_Channels * chans) {
  float sum = 0.0;
  int taskno = 0;
  Task t = (chans->in)->receive();
  while(!t.isEos()) {
#ifdef TRACEMSG
    cout << "Drain received task " << t.get() << endl;
#endif
    sum += t.get();
    taskno++;
    t = (chans->in)->receive();
  }
#ifdef TRACEMSG
  cout << "Drain: got EOS, terminating" << endl;
#endif
  cout << "Drain computed " << sum  << " from " << taskno << " tasks " << endl;
  return(NULL);
 
}
 
int main(int argc, char * argv[]) {
  // usage: a.out ntasks
 
  Channel<Task> ch12, ch23;
  IO_Channels stage1, stage2, stage3;
  pthread_t tid1, tid2, tid3;
 
  stage1.in = NULL;  stage1.out = &ch12;
  stage2.in = &ch12; stage2.out = &ch23;
  stage3.in = &ch23; stage3.out = NULL;
 
  ntasks = atoi(argv[1]);
 
  // start third stage
  if(pthread_create(&tid3, NULL, (void *(*)(void *))drain,
                    (void *) &stage3) != 0) {
    cout << "Error while creating stage3 " << endl;
    return(-1);
  }
  cout << "Drain started" << endl;
 
 
  // start second stage
  if(pthread_create(&tid2, NULL, (void *(*)(void *))square,
                    (void *) &stage2) != 0) {
    cout << "Error while creating stage2 " << endl;
    return(-1);
  }
  cout << "Square started!" << endl;
 
  // start first stage
  if(pthread_create(&tid1, NULL, (void *(*)(void *))source,
                    (void *) &stage1) != 0) {
    cout << "Error while creating stage1 " << endl;
    return(-1);
  }
  cout << "Source started " << endl;
 
 
  void * retval;
  pthread_join(tid3, &retval);
 
  return(0);
}
"BChannel.hpp"
#include <iostream>
#include <queue>
 
 
template<class Task> class Channel {
 
private:
  std::queue<Task> chan;
  pthread_mutex_t mutex;
  pthread_cond_t cond_r, cond_w;
  int max;
  int msgno;
 
public:
 
  Channel() {
    mutex  = PTHREAD_MUTEX_INITIALIZER;
    cond_r = PTHREAD_COND_INITIALIZER;
    cond_w = PTHREAD_COND_INITIALIZER;
    msgno  = 0;
    max = 4;
  }
 
  Channel(int bound):max(bound) {
    mutex  = PTHREAD_MUTEX_INITIALIZER;
    cond_r = PTHREAD_COND_INITIALIZER;
    cond_w = PTHREAD_COND_INITIALIZER;
    msgno  = 0;
  }
 
 
  void send(Task t) {
    pthread_mutex_lock(&mutex);
    while(msgno == max)
      pthread_cond_wait(&cond_w, &mutex);
    chan.push(t);
    msgno++;
    pthread_cond_signal(&cond_r);    // for waiting receives
    pthread_mutex_unlock(&mutex);
    return;
  }
 
  Task receive() {
    pthread_mutex_lock(&mutex);
    while(chan.empty()) {
      pthread_cond_wait(&cond_r, &mutex);
    }
    Task t = chan.front();
    chan.pop();
    msgno--;
    pthread_cond_signal(&cond_w);
    pthread_mutex_unlock(&mutex);
    return(t);
  }
};
"bpipe.cpp"
#include <iostream>
#include <queue>
#include <stdlib.h>
#include <unistd.h>
 
using namespace std;
 
#include "BChannel.hpp"
 
typedef enum {EOS, TASK} Task_t;
 
class Task {
private:
  float x;
  Task_t tag;
public:
 
  Task(float x, Task_t tag):x(x),tag(tag) {}
  Task(Task_t tag):tag(tag)  {}
 
  bool isEos() {
    return(tag == EOS);
  }
 
  float get() {
    return(x);
  }
 
  void set(float y) {
    x = y;
    return;
  }
 
};
 
typedef struct __iochans {
  Channel<Task> *in;
  Channel<Task> *out;
} IO_Channels;
 
int ntasks = 0;
 
void * source(IO_Channels * chans) {
  // produce as many tasks as required
  for(int i=0; i<ntasks; i++) {
    Task t(((float) i), TASK);
    (chans->out)->send(t);
#ifdef TRACEMSG
    cout << "Source: output task " << i << endl;
#endif
  }
  cout << "Source: sending EOS" << endl;
  (chans->out)->send(*(new Task(EOS)));
#ifdef TRACEMSG
  cout << "Source: EOS Sent\n terminating ... " << endl;
#endif
  return(NULL);
}
 
void * square(IO_Channels * chans) {
  Task t = (chans->in)->receive();
  while(!t.isEos()) {
#ifdef TRACEMSG
    cout << "Stage1 received task " << t.get() << endl;
#endif
    float x = t.get();
    t.set(x*x);
    (chans->out)->send(t);
    t = (chans->in)->receive();
  }
#ifdef TRACEMSG
  cout << "Square: propagating EOS" << endl;
#endif
  (chans->out)->send(t); //eos
  return(NULL);
}
 
void * drain(IO_Channels * chans) {
  float sum = 0.0;
  int taskno = 0;
  Task t = (chans->in)->receive();
  while(!t.isEos()) {
#ifdef TRACEMSG
    cout << "Drain received task " << t.get() << endl;
#endif
    sum += t.get();
    taskno++;
    t = (chans->in)->receive();
  }
#ifdef TRACEMSG
  cout << "Drain: got EOS, terminating" << endl;
#endif
  cout << "Drain computed " << sum  << " from " << taskno << " tasks " << endl;
  return(NULL);
 
}
 
int main(int argc, char * argv[]) {
  // usage: a.out ntasks
 
  Channel<Task> ch12, ch23;
  IO_Channels stage1, stage2, stage3;
  pthread_t tid1, tid2, tid3;
 
  stage1.in = NULL;  stage1.out = &ch12;
  stage2.in = &ch12; stage2.out = &ch23;
  stage3.in = &ch23; stage3.out = NULL;
 
  ntasks = atoi(argv[1]);
 
  // start third stage
  if(pthread_create(&tid3, NULL, (void *(*)(void *))drain,
                    (void *) &stage3) != 0) {
    cout << "Error while creating stage3 " << endl;
    return(-1);
  }
  cout << "Drain started" << endl;
 
 
  // start second stage
  if(pthread_create(&tid2, NULL, (void *(*)(void *))square,
                    (void *) &stage2) != 0) {
    cout << "Error while creating stage2 " << endl;
    return(-1);
  }
  cout << "Square started!" << endl;
 
  // start first stage
  if(pthread_create(&tid1, NULL, (void *(*)(void *))source,
                    (void *) &stage1) != 0) {
    cout << "Error while creating stage1 " << endl;
    return(-1);
  }
  cout << "Source started " << endl;
 
 
  void * retval;
  pthread_join(tid3, &retval);
 
  return(0);
}
CFLAGS = -DTRACEMSG -pthread -std=c++11
CC     = g++
 
all:    pipe bpipe
 
pipe:   pipe.cpp Channel.hpp
        $(CC) $(CFLAGS) pipe.cpp -o pipe
 
bpipe:  bpipe.cpp BChannel.hpp
        $(CC) $(CFLAGS) bpipe.cpp -o bpipe
 
clean:
        rm -f pipe bpipe a.out *~
magistraleinformaticanetworking/spm/spm14exe1sol.txt · Ultima modifica: 30/10/2014 alle 11:43 (10 anni fa) da Marco Danelutto