| Uutiset | Koodikirjasto | Wiki | Keskustelut | FAQ | Info |
Producer-ConsumerSharlin 13.05.09 19:22 Producer-Consumer -patternin esimerkkitoteutus C++0x:n säikeillä. Kääntyy gcc 4.4:llä, tuskin (vielä) millään muulla kääntäjällä.
// // Käännä vivulla -lpthread varustettuna! // #define _GLIBCXX_USE_NANOSLEEP #include <thread> #include <condition_variable> #include <queue> #include <chrono> #include <iostream> #include <random> using namespace std; // Represents a pool of data with thread-safe producer/consumer semantics class work_pool { public: typedef unique_lock<std::mutex> lock_type; work_pool(int max_size) : max_size(max_size) { } // Acquires a lock on the pool and returns it // A thread MUST hold a lock on the pool before calling ANY other member functions. lock_type lock() { return lock_type(mutex); } // Pushes a datum to the work pool void produce(int i) { queue.push(i); } // Pops a datum from the work pool int consume() { int res = queue.front(); queue.pop(); return res; } // Returns whether the pool is full bool full() { return queue.size() >= max_size; } // Returns whether the pool is empty bool empty() { return queue.empty(); } // Returns the number of items in the pool int size() { return queue.size(); } // Releases lock, blocks until a producer thread calls notify, then reacquires the lock void wait(lock_type& lock) { cvar.wait(lock); } // Notifies the consumer threads waiting on this pool void notify() { cvar.notify_all(); } private: int max_size; std::queue<int> queue; std::condition_variable cvar; std::mutex mutex; }; // Produces data to the given work pool. void produce(int id, work_pool& pool) { static int i = 0; while(true) { // Simulate some heavy work being done this_thread::sleep_for(chrono::milliseconds(rand() % 3000)); // Acquire lock on the work pool work_pool::lock_type lock(pool.lock()); // Push data to pool if not full if(!pool.full()) { pool.produce(i++); cerr << " producer " << id << ": " << i << " size=" << pool.size() << '\n'; } // Notify consumers pool.notify(); } } // Consumes data from the given work pool. void consume(int id, work_pool& pool) { // Acquire lock on the work pool work_pool::lock_type lock(pool.lock()); while(1) { // Release lock, wait for notification by a producer pool.wait(lock); // wait reacquires the lock before it returns // Consume data from pool if any if(!pool.empty()) { int i = pool.consume(); cerr << " consumer " << id << ": " << i << " size=" << pool.size() << "\n\n"; // rand is not thread-safe so call it before releasing the lock int delay = rand() % 3000; // Release lock on pool, simulate work being done lock.unlock(); this_thread::sleep_for(chrono::milliseconds(delay)); lock.lock(); } } } int main() { srand(time(0)); work_pool pool(10); // Spawn producer and consumer threads vector<thread> producers, consumers; for(int i = 0; i < 4; ++i) { // emplace_back constructs the object in-place // equivalent to push_back(thread(...)) but avoids a potential copy/move consumers.emplace_back(consume, i, ref(pool)); producers.emplace_back(produce, i, ref(pool)); } while(true); } |
![]() Haku
|