Producer-Consumer

Sharlin 13.05.09 19:22

Producer-Consumer -patternin esimerkkitoteutus C++0x:n säikeillä. Kääntyy gcc 4.4:llä, tuskin (vielä) millään muulla kääntäjällä.

 Tekstiversio  Arvo: 5 (5 ääntä)  Äänestä: +  -
//
// Käännä vivulla -lpthread varustettuna!
//

#define _GLIBCXX_USE_NANOSLEEP

#include <thread>
#include <condition_variable>
#include <queue>
#include <chrono>
#include <iostream>
#include <random>
 
using namespace std;
 
 
// Represents a pool of data with thread-safe producer/consumer semantics
class work_pool {
 public:
  typedef unique_lock<std::mutex> lock_type;
 
  work_pool(int max_size) : max_size(max_size) {
  }
 
  // Acquires a lock on the pool and returns it
  // A thread MUST hold a lock on the pool before calling ANY other member functions.
  lock_type lock() {
    return lock_type(mutex);
  }
 
  // Pushes a datum to the work pool
  void produce(int i) {
    queue.push(i);
  }
 
  // Pops a datum from the work pool
  int consume() {
    int res = queue.front();
    queue.pop();
    return res;
  }
 
  // Returns whether the pool is full
  bool full() {
    return queue.size() >= max_size;
  }
 
  // Returns whether the pool is empty
  bool empty() {
    return queue.empty();
  }
 
  // Returns the number of items in the pool
  int size() {
    return queue.size();
  }
 
  // Releases lock, blocks until a producer thread calls notify, then reacquires the lock
  void wait(lock_type& lock) {
    cvar.wait(lock);
  }
 
  // Notifies the consumer threads waiting on this pool
  void notify() {
    cvar.notify_all();
  }
 
 private:
  int max_size;
  std::queue<int> queue;
  std::condition_variable cvar;
  std::mutex mutex;
};
 
 
// Produces data to the given work pool.
void produce(int id, work_pool& pool) {
  static int i = 0;
  while(true) {
    // Simulate some heavy work being done
    this_thread::sleep_for(chrono::milliseconds(rand() % 3000));
 
    // Acquire lock on the work pool
    work_pool::lock_type lock(pool.lock());
 
    // Push data to pool if not full
    if(!pool.full()) {
      pool.produce(i++);
      cerr << " producer " << id << ": " << i << " size=" << pool.size() << '\n';
    }
 
    // Notify consumers
    pool.notify();
  }
}
 
// Consumes data from the given work pool.
void consume(int id, work_pool& pool) {
 
  // Acquire lock on the work pool
  work_pool::lock_type lock(pool.lock());
 
  while(1) {
 
    // Release lock, wait for notification by a producer
    pool.wait(lock);
    // wait reacquires the lock before it returns
 
    // Consume data from pool if any
    if(!pool.empty()) {
      int i = pool.consume();
      cerr << " consumer " << id << ": " << i << " size=" << pool.size() << "\n\n";
 
      // rand is not thread-safe so call it before releasing the lock
      int delay = rand() % 3000;
 
      // Release lock on pool, simulate work being done
      lock.unlock();
      this_thread::sleep_for(chrono::milliseconds(delay));
      lock.lock();
    }
  }
}
 
 
int main() {
 
  srand(time(0));
 
  work_pool pool(10);
 
  // Spawn producer and consumer threads
  vector<thread> producers, consumers;
  for(int i = 0; i < 4; ++i) {
    // emplace_back constructs the object in-place
    // equivalent to push_back(thread(...)) but avoids a potential copy/move
    consumers.emplace_back(consume, i, ref(pool));
    producers.emplace_back(produce, i, ref(pool));
  }
 
  while(true);
}