wiki:QueueLib

Queue

back


Rationale

A queue is a well known data structure and so the  STL provides one; well, in fact it is a 'container adapter', meaning that it is implemented on top of some underlying container type.

Other variants exists as well:

But all these queues know nothing about threads ... And sometimes you kind a need them ... A more traditional name for this is the 'producer/consumer' problem, you can solve this with semaphores/events/buffers etc etc - but all the code I saw was rather large ...

So how about a 'threadsafe' queue, where threads just wait when there is nothing todo?


Design

Threadsafe usually means locking. The queue presented uses the lock variables from boost::thread for synchronizing access.

The queue is very simple, so I'll present the whole source here:

/**
 * 
 */
template <class T, class Container = std::deque< T > >
class queue
{
  private:
    boost::mutex queue_mutex;
    boost::condition buffer_empty;
  
  protected:
    Container c;  // container for the elements

  public:
    // number of elements
    typename Container::size_type size() //const 
    {
        boost::mutex::scoped_lock lock(queue_mutex);
        return c.size();
    }

    // is queue empty?
    bool empty() // const 
    {
        boost::mutex::scoped_lock lock(queue_mutex);
        return c.empty();
    }

    // insert element into the queue
    void push (const T& elem) 
    {
        boost::mutex::scoped_lock lock(queue_mutex);
        c.push_back(elem);
        buffer_empty.notify_all();
    }

    // read element from the queue and return its value
    T pop () 
    {
        boost::mutex::scoped_lock lock(queue_mutex);
        while(c.empty()) 
          buffer_empty.wait(lock);
        T elem(c.front());
        c.pop_front();
        return elem;
    }

    // return value of next element
    T& front () 
    {
      boost::mutex::scoped_lock lock(queue_mutex);
      while(c.empty()) 
         buffer_empty.wait(lock);
      return c.front();
    }
    
    // 
    T& top () 
    {
      return front();
    }
};

As you can see, it uses a lock variable to synchronize access and a condition to signal the pop/front function (used by the consumer threads) that the producer thread pushed something.


Dependencies

None, only STL, but this is a standard library, so I do not consider this as a dependency.


Example

Start with a queue:

 typedef queue< int > int_queue;

You also need a 'main':

  int main()
  {
    int_queue q;
    
    sender s(q);
    receiver r(q);

    boost::thread thrd1(s);
    boost::thread thrd2(r);

    thrd1.join();
    thrd2.join();
    
    return 0;
  }

The output will be something like:

Running 1 test case...
received: sent: 0
sent: 0
received: 1
sent: 1
received: 2
sent: 2
received: 3

So how is this magic achieved? The example uses 2 classes accessing both ends of the queue:

class sender
{
  int_queue& queue_;
public:
  sender(int_queue& queue) : queue_(queue) {}

  void operator()()
  {
    int n = 0;
    while (n < 100) 
    {
      queue_.push(n);
      std::cout << "sent: " << n << std::endl;
      ++n;
    }
    queue_.push(-1);
  }
};

class receiver
{
  int_queue& queue_;
public:
  receiver(int_queue& queue) : queue_(queue) {}

  void operator()()
  {
    int n;
    do 
    {
      n = queue_.pop();
      std::cout << "received: " << n << std::endl;
    } while (n != -1); // -1 indicates end of buffer
  }
};

The boost::tread bit calls operator()() and the show is on the road ...

See src/queue for the running example.


Use

A good example of how this threadsafe queue is used can found in the threadpool: the boss-thread puts items in the queue, worker-threads take items from the queue based on first-come/first-served.

If there is no work, workers just wait.