C++11 Multi-threaded Programming: Task-Queue Patterns - Part 2

Introduction

In the previous post, we discussed three task-queue patterns using thread synchronization primitives introduces in C++11. In this post, we will implement these patterns using  C++11 std::async features.

Here is the recap of the previous post:

  • Single Producer and Single Consumer

Basic model with one consumer thread appropriate for lightweight processing of work items sequentially.

  • Single Producer and Multiple Consumers - Unordered Delivery

Our work items are processed concurrently. The application hook (ReadyForDelivery) needs to be thread-safe as it gets called by multiple consumer threads. We will adapt this model if ordering is not desired by the application and concurrent processing is needed for further processing by the application logic.

  • Single Producer and Multiple Consumers - Ordered Delivery

Here also, our work items are processed concurrently but additionally, the work items are delivered sequentially to the application. The delivery handler need not be thread-safe as only the delivery thread calls it.

While implementing the aforementioned patterns using C++11 std::async, our definition of WorkItem and SleepyWorkItem remains same as previous post. The WorkItem represents task that needs processing and consumers represents processing threads.

class WorkItem {
public:
  WorkItem() = default;
  virtual ~WorkItem() = default;

  virtual void Process() = 0;
};
typedef std::shared_ptr<WorkItem> WorkItemPtr;

class SleepyWorkItem: public WorkItem {
public:
  SleepyWorkItem(uint32_t id, uint32_t msec_duration):
    id_(id), msec_duration_(msec_duration) {}
  ~SleepyWorkItem() = default;

  void Process() override {
    std::this_thread::sleep_for(std::chrono::milliseconds(msec_duration_));
  }

  uint32_t id() const {
    return id_;
  }

  uint32_t msec_duration() const {
    return msec_duration_;
  }

private:
  uint32_t id_;
  uint32_t msec_duration_;
};

1. Single Producer and Multiple Consumers - Ordered Delivery


We will start by discussing last pattern first and here is its implementation:

class OrderedWorkerPool2 {
public:
  typedef std::function<void (WorkItemPtr)> AppCallback;

  OrderedWorkerPool2(const AppCallback& callback);
  ~OrderedWorkerPool2();

  void Add(WorkItemPtr work_item);
private:
  void DoDelivery();

  const AppCallback callback_;
  std::queue<std::future<WorkItemPtr>> queue_;
  std::thread thread_;
  std::mutex mutex_;
  std::condition_variable cond_;
  bool finished_;
};

// Implementation
OrderedWorkerPool2::OrderedWorkerPool2(const AppCallback& callback):
  callback_(callback), thread_(&OrderedWorkerPool2::DoDelivery, this), finished_(false) {
}

OrderedWorkerPool2::~OrderedWorkerPool2() {
  { // Lock scope
    std::unique_lock<std::mutex> lock(mutex_);
    finished_ = true;
  }
  cond_.notify_one();
  thread_.join();
}

// Producer thread context
void OrderedWorkerPool2::Add(WorkItemPtr work_item) {
  { // Lock scope
    std::unique_lock<std::mutex> lock(mutex_);
    queue_.emplace(std::async(std::launch::async,
    [work_item] {
      // Do real work of processing the WorkItem asynchronous 
      work_item->Process();
      return work_item;
    }));
  }
  cond_.notify_one();
}

// Delivery thread context
void OrderedWorkerPool2::DoDelivery() {
  while (true) {
    std::future<WorkItemPtr> future;
    { // Lock scope
      std::unique_lock<std::mutex> lock(mutex_);
      // Wait if queue is empty and finished_ is false
      cond_.wait(lock, [this] () { 
        return !queue_.empty() || finished_;
      });
      if (!queue_.empty()) {
        future = std::move(queue_.front());
        queue_.pop();
      } else if (finished_) {
        break;
      } else {
        continue;
      }
    }
    // We must call "get" outside the lock scope as it may
    // block in case the WorkItem is deferred or is being processed
    // and the producer thread will also block.
    // The "get" call also ensures that the 
    // deferred WorkItem is processed.
    WorkItemPtr work_item = future.get();
    if (work_item) {
      callback_(work_item);
    }
  }
}


We still have a queue and a delivery thread but our implementation is simplified now. The producer thread triggers asynchronous processing of work item by calling std::async and we enqueue std::future object returned by this call. The lambda expression calls Process asynchronously on our work item and returns the completed work item once done.

The delivery thread dequeue this std::future object and calls get on this object. This call returns the processed work-item. It may block if std::async processing of work item is not complete.

It is important to call std::future::get outside queue lock as it may block if the work item is not ready. This will cause the producer thread to also block while enqueuing.


Our delivery thread wakes-up from wait when work item is enqueued. Another approach is to wake it up once the work item is ready to be delivered.  We need to notify the delivery thread after Process and we can use std::future::wait_for with 0 duration in the std::condition_variable::wait predicate to test if our work item is ready.

...
    [this, work_item] {
        // Do real work of processing the WorkItem asynchronous 
        work_item->Process();
        cond_.notify_one();
        return work_item;
      }));
    }
...
Note that we need to test for std::future_status::deferred also here. If work item is deferred, it will be executed once std::future::get is called.

...
      cond_.wait(lock, [this] () {
        if (!queue_.empty()) {
          std::future_status status = queue_.front().wait_for(std::chrono::seconds(0));
          if (status == std::future_status::ready ||
              status == std::future_status::deferred) {
            return true;
          }
          return false;
        }
        return finished_;
      });
...

ReadyForDelivery is our application handler and following main code represents the producer end of our task-queue:
void ReadyForDelivery(WorkItemPtr work_item) {
  std::shared_ptr<SleepyWorkItem> sleepy_work_item = 
      std::dynamic_pointer_cast<SleepyWorkItem> (work_item);
  std::cout << "Id " << sleepy_work_item->id() << ", " << 
      sleepy_work_item->msec_duration() << 
      " millisec is ready for delivery!" << std::endl;
}

int main(int argc, char** argv) {
  std::vector<WorkItemPtr> items;
  items.emplace_back(std::make_shared<SleepyWorkItem>(1, 100));
  items.emplace_back(std::make_shared<SleepyWorkItem>(2, 700));
  items.emplace_back(std::make_shared<SleepyWorkItem>(3, 500));
  items.emplace_back(std::make_shared<SleepyWorkItem>(4, 200));
  items.emplace_back(std::make_shared<SleepyWorkItem>(5, 300));

  auto start = std::chrono::high_resolution_clock::now();
  { // Object scope
    OrderedWorkerPool2 worker_pool(std::bind(&ReadyForDelivery, std::placeholders::_1));
    for (WorkItemPtr item: items) {
      worker_pool.Add(item);
    }
  }
  auto end = std::chrono::high_resolution_clock::now();
  std::cout << "Completed work items in " << 
    std::chrono::duration_cast(end - start).count() << " milliseconds." << std::endl;

  return 0;
}

It is also important to have queue size check when using std::asyc and to block the producer if the size exceeds some upper limit. This is needed if the processing of work items is CPU intensive, if the enqueue rate becomes higher than dequeue rate, there could be multiple outstanding work items that are being processed. We do not want to overwhelm our system with multiple processing threads.


2. Single Producer and Multiple Consumers - Unordered Delivery

Instead of calling our appliction handler ReadyForDelivery from delivery thread, if we call it in the context of async thread, we will achieve desired functionality for this patter.

...
    [this, work_item] {
      // Do real work of processing the WorkItem asynchronous 
      work_item->Process();
      callback_(work_item);
      return work_item;
    }));
  }
  cond_.notify_one();
...

3. Single Producer and Single Consumer

To implement this pattern, we just need to change the launch policy of std::async to std::launch::deferred. This will make sure that the work item will only get executed when std::future::get is called by delivery thread.
...
    queue_.emplace(std::async(std::launch::deferred,
...


Conclusion

Lots of implementation details are hidden from us when using std::async and std::future. The standard library manages inter thread synchronization when we want to retrieve the results of asynchronous operation and internally manages async thread pool. The concept is really fascinating as with so much of ease, we can launch a task in background, pass its handler (std::future) where ever we want and call std::future::get when we want to retrieve the results.

On the down side, we do not have control over number of threads created and we also loose the execution predictability of our work items.

For high performance applications that needs configuration and monitoring of worker threads, I personally would prefer implementation using std::thread primitive and create my own threads.


The asynchronous usage of std::async is really beneficial when processing involves IO operation. Consider an application where processing is done by making synchronous remote procedure calls in our Process.The delivery thread accumulates the results from all these work items and takes further action.


The asynchronous computation support that is now part of standard library is really powerful feature set for writing concurrent code that is portable and exploits hardware parallelism. Being part of standard library, this also encourages async paradigm to be adapted in mainstream programming. Hope to see more migration to C++11 and future standards.

Please feel free to leave your feedback and thanks for reading!







Comments

  1. Great articles, first of all Thanks for writing such lovely Post!

    thesimpletruth
    Education

    ReplyDelete
  2. This post is much helpful for us. This is really very massive value to all the readers and it will be the only reason for the post to get popular with great authority.
    Devops Training in Chennai
    Devops Certification in Chennai
    CCNA Course in Chennai
    Cloud Computing Training in Chennai
    Data Science Course in Chennai
    Devops Training in OMR
    Devops Training in Tnagar

    ReplyDelete

Post a Comment

Popular posts from this blog

C++11 Multi-threaded Programming: Task-Queue Patterns - Part 1