Queue
Rationale
A queue is a well known data structure and so the STL provides one; well, in fact it is a 'container adapter', meaning that it is implemented on top of some underlying container type.
Other variants exists as well:
- priority queue
- a 'safer' queue (see Josuttis book The C++ Standard Library)
But all these queues know nothing about threads ... And sometimes you kind a need them ... A more traditional name for this is the 'producer/consumer' problem, you can solve this with semaphores/events/buffers etc etc - but all the code I saw was rather large ...
So how about a 'threadsafe' queue, where threads just wait when there is nothing todo?
Design
Threadsafe usually means locking. The queue presented uses the lock variables from boost::thread for synchronizing access.
The queue is very simple, so I'll present the whole source here:
/** * */ template <class T, class Container = std::deque< T > > class queue { private: boost::mutex queue_mutex; boost::condition buffer_empty; protected: Container c; // container for the elements public: // number of elements typename Container::size_type size() //const { boost::mutex::scoped_lock lock(queue_mutex); return c.size(); } // is queue empty? bool empty() // const { boost::mutex::scoped_lock lock(queue_mutex); return c.empty(); } // insert element into the queue void push (const T& elem) { boost::mutex::scoped_lock lock(queue_mutex); c.push_back(elem); buffer_empty.notify_all(); } // read element from the queue and return its value T pop () { boost::mutex::scoped_lock lock(queue_mutex); while(c.empty()) buffer_empty.wait(lock); T elem(c.front()); c.pop_front(); return elem; } // return value of next element T& front () { boost::mutex::scoped_lock lock(queue_mutex); while(c.empty()) buffer_empty.wait(lock); return c.front(); } // T& top () { return front(); } };
As you can see, it uses a lock variable to synchronize access and a condition to signal the pop/front function (used by the consumer threads) that the producer thread pushed something.
Dependencies
None, only STL, but this is a standard library, so I do not consider this as a dependency.
Example
Start with a queue:
typedef queue< int > int_queue;
You also need a 'main':
int main() { int_queue q; sender s(q); receiver r(q); boost::thread thrd1(s); boost::thread thrd2(r); thrd1.join(); thrd2.join(); return 0; }
The output will be something like:
Running 1 test case... received: sent: 0 sent: 0 received: 1 sent: 1 received: 2 sent: 2 received: 3
So how is this magic achieved? The example uses 2 classes accessing both ends of the queue:
class sender { int_queue& queue_; public: sender(int_queue& queue) : queue_(queue) {} void operator()() { int n = 0; while (n < 100) { queue_.push(n); std::cout << "sent: " << n << std::endl; ++n; } queue_.push(-1); } }; class receiver { int_queue& queue_; public: receiver(int_queue& queue) : queue_(queue) {} void operator()() { int n; do { n = queue_.pop(); std::cout << "received: " << n << std::endl; } while (n != -1); // -1 indicates end of buffer } };
The boost::tread bit calls operator()() and the show is on the road ...
See src/queue for the running example.
Use
A good example of how this threadsafe queue is used can found in the threadpool: the boss-thread puts items in the queue, worker-threads take items from the queue based on first-come/first-served.
If there is no work, workers just wait.