C++11 Multi-threaded Programming: Task-Queue Patterns - Part 2
Introduction
In the previous post, we discussed three task-queue patterns using thread synchronization primitives introduces in C++11. In this post, we will implement these patterns using C++11 std::async features.
Here is the recap of the previous post:
Our work items are processed concurrently. The application hook (ReadyForDelivery) needs to be thread-safe as it gets called by multiple consumer threads. We will adapt this model if ordering is not desired by the application and concurrent processing is needed for further processing by the application logic.
Here also, our work items are processed concurrently but additionally, the work items are delivered sequentially to the application. The delivery handler need not be thread-safe as only the delivery thread calls it.
While implementing the aforementioned patterns using C++11 std::async, our definition of WorkItem and SleepyWorkItem remains same as previous post. The WorkItem represents task that needs processing and consumers represents processing threads.
1. Single Producer and Multiple Consumers - Ordered Delivery
We will start by discussing last pattern first and here is its implementation:
We still have a queue and a delivery thread but our implementation is simplified now. The producer thread triggers asynchronous processing of work item by calling std::async and we enqueue std::future object returned by this call. The lambda expression calls Process asynchronously on our work item and returns the completed work item once done.
The delivery thread dequeue this std::future object and calls get on this object. This call returns the processed work-item. It may block if std::async processing of work item is not complete.
It is important to call std::future::get outside queue lock as it may block if the work item is not ready. This will cause the producer thread to also block while enqueuing.
Our delivery thread wakes-up from wait when work item is enqueued. Another approach is to wake it up once the work item is ready to be delivered. We need to notify the delivery thread after Process and we can use std::future::wait_for with 0 duration in the std::condition_variable::wait predicate to test if our work item is ready.
ReadyForDelivery is our application handler and following main code represents the producer end of our task-queue:
Instead of calling our appliction handler ReadyForDelivery from delivery thread, if we call it in the context of async thread, we will achieve desired functionality for this patter.
3. Single Producer and Single Consumer
To implement this pattern, we just need to change the launch policy of std::async to std::launch::deferred. This will make sure that the work item will only get executed when std::future::get is called by delivery thread.
In the previous post, we discussed three task-queue patterns using thread synchronization primitives introduces in C++11. In this post, we will implement these patterns using C++11 std::async features.
Here is the recap of the previous post:
- Single Producer and Single Consumer
Basic model with one consumer thread appropriate for lightweight processing of work items sequentially.
- Single Producer and Multiple Consumers - Unordered Delivery
Our work items are processed concurrently. The application hook (ReadyForDelivery) needs to be thread-safe as it gets called by multiple consumer threads. We will adapt this model if ordering is not desired by the application and concurrent processing is needed for further processing by the application logic.
- Single Producer and Multiple Consumers - Ordered Delivery
Here also, our work items are processed concurrently but additionally, the work items are delivered sequentially to the application. The delivery handler need not be thread-safe as only the delivery thread calls it.
While implementing the aforementioned patterns using C++11 std::async, our definition of WorkItem and SleepyWorkItem remains same as previous post. The WorkItem represents task that needs processing and consumers represents processing threads.
class WorkItem {
public:
WorkItem() = default;
virtual ~WorkItem() = default;
virtual void Process() = 0;
};
typedef std::shared_ptr<WorkItem> WorkItemPtr;
class SleepyWorkItem: public WorkItem {
public:
SleepyWorkItem(uint32_t id, uint32_t msec_duration):
id_(id), msec_duration_(msec_duration) {}
~SleepyWorkItem() = default;
void Process() override {
std::this_thread::sleep_for(std::chrono::milliseconds(msec_duration_));
}
uint32_t id() const {
return id_;
}
uint32_t msec_duration() const {
return msec_duration_;
}
private:
uint32_t id_;
uint32_t msec_duration_;
};
1. Single Producer and Multiple Consumers - Ordered Delivery
We will start by discussing last pattern first and here is its implementation:
class OrderedWorkerPool2 {
public:
typedef std::function<void (WorkItemPtr)> AppCallback;
OrderedWorkerPool2(const AppCallback& callback);
~OrderedWorkerPool2();
void Add(WorkItemPtr work_item);
private:
void DoDelivery();
const AppCallback callback_;
std::queue<std::future<WorkItemPtr>> queue_;
std::thread thread_;
std::mutex mutex_;
std::condition_variable cond_;
bool finished_;
};
// Implementation
OrderedWorkerPool2::OrderedWorkerPool2(const AppCallback& callback):
callback_(callback), thread_(&OrderedWorkerPool2::DoDelivery, this), finished_(false) {
}
OrderedWorkerPool2::~OrderedWorkerPool2() {
{ // Lock scope
std::unique_lock<std::mutex> lock(mutex_);
finished_ = true;
}
cond_.notify_one();
thread_.join();
}
// Producer thread context
void OrderedWorkerPool2::Add(WorkItemPtr work_item) {
{ // Lock scope
std::unique_lock<std::mutex> lock(mutex_);
queue_.emplace(std::async(std::launch::async,
[work_item] {
// Do real work of processing the WorkItem asynchronous
work_item->Process();
return work_item;
}));
}
cond_.notify_one();
}
// Delivery thread context
void OrderedWorkerPool2::DoDelivery() {
while (true) {
std::future<WorkItemPtr> future;
{ // Lock scope
std::unique_lock<std::mutex> lock(mutex_);
// Wait if queue is empty and finished_ is false
cond_.wait(lock, [this] () {
return !queue_.empty() || finished_;
});
if (!queue_.empty()) {
future = std::move(queue_.front());
queue_.pop();
} else if (finished_) {
break;
} else {
continue;
}
}
// We must call "get" outside the lock scope as it may
// block in case the WorkItem is deferred or is being processed
// and the producer thread will also block.
// The "get" call also ensures that the
// deferred WorkItem is processed.
WorkItemPtr work_item = future.get();
if (work_item) {
callback_(work_item);
}
}
}
We still have a queue and a delivery thread but our implementation is simplified now. The producer thread triggers asynchronous processing of work item by calling std::async and we enqueue std::future object returned by this call. The lambda expression calls Process asynchronously on our work item and returns the completed work item once done.
The delivery thread dequeue this std::future object and calls get on this object. This call returns the processed work-item. It may block if std::async processing of work item is not complete.
It is important to call std::future::get outside queue lock as it may block if the work item is not ready. This will cause the producer thread to also block while enqueuing.
Our delivery thread wakes-up from wait when work item is enqueued. Another approach is to wake it up once the work item is ready to be delivered. We need to notify the delivery thread after Process and we can use std::future::wait_for with 0 duration in the std::condition_variable::wait predicate to test if our work item is ready.
...
[this, work_item] {
// Do real work of processing the WorkItem asynchronous
work_item->Process();
cond_.notify_one();
return work_item;
}));
}
...
Note that we need to test for std::future_status::deferred also here. If work item is deferred, it will be executed once std::future::get is called....
cond_.wait(lock, [this] () {
if (!queue_.empty()) {
std::future_status status = queue_.front().wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready ||
status == std::future_status::deferred) {
return true;
}
return false;
}
return finished_;
});
...
ReadyForDelivery is our application handler and following main code represents the producer end of our task-queue:
void ReadyForDelivery(WorkItemPtr work_item) {
std::shared_ptr<SleepyWorkItem> sleepy_work_item =
std::dynamic_pointer_cast<SleepyWorkItem> (work_item);
std::cout << "Id " << sleepy_work_item->id() << ", " <<
sleepy_work_item->msec_duration() <<
" millisec is ready for delivery!" << std::endl;
}
int main(int argc, char** argv) {
std::vector<WorkItemPtr> items;
items.emplace_back(std::make_shared<SleepyWorkItem>(1, 100));
items.emplace_back(std::make_shared<SleepyWorkItem>(2, 700));
items.emplace_back(std::make_shared<SleepyWorkItem>(3, 500));
items.emplace_back(std::make_shared<SleepyWorkItem>(4, 200));
items.emplace_back(std::make_shared<SleepyWorkItem>(5, 300));
auto start = std::chrono::high_resolution_clock::now();
{ // Object scope
OrderedWorkerPool2 worker_pool(std::bind(&ReadyForDelivery, std::placeholders::_1));
for (WorkItemPtr item: items) {
worker_pool.Add(item);
}
}
auto end = std::chrono::high_resolution_clock::now();
std::cout << "Completed work items in " <<
std::chrono::duration_cast(end - start).count() << " milliseconds." << std::endl;
return 0;
}
It is also important to have queue size check when using std::asyc and to block the producer if the size exceeds some upper limit. This is needed if the processing of work items is CPU intensive, if the enqueue rate becomes higher than dequeue rate, there could be multiple outstanding work items that are being processed. We do not want to overwhelm our system with multiple processing threads.
2. Single Producer and Multiple Consumers - Unordered Delivery
Instead of calling our appliction handler ReadyForDelivery from delivery thread, if we call it in the context of async thread, we will achieve desired functionality for this patter.
...
[this, work_item] {
// Do real work of processing the WorkItem asynchronous
work_item->Process();
callback_(work_item);
return work_item;
}));
}
cond_.notify_one();
...
3. Single Producer and Single Consumer
To implement this pattern, we just need to change the launch policy of std::async to std::launch::deferred. This will make sure that the work item will only get executed when std::future::get is called by delivery thread.
...
queue_.emplace(std::async(std::launch::deferred,
...
Conclusion
Lots of implementation details are hidden from us when using std::async and std::future. The standard library manages inter thread synchronization when we want to retrieve the results of asynchronous operation and internally manages async thread pool. The concept is really fascinating as with so much of ease, we can launch a task in background, pass its handler (std::future) where ever we want and call std::future::get when we want to retrieve the results.
On the down side, we do not have control over number of threads created and we also loose the execution predictability of our work items.
For high performance applications that needs configuration and monitoring of worker threads, I personally would prefer implementation using std::thread primitive and create my own threads.
The asynchronous usage of std::async is really beneficial when processing involves IO operation. Consider an application where processing is done by making synchronous remote procedure calls in our Process.The delivery thread accumulates the results from all these work items and takes further action.
The asynchronous computation support that is now part of standard library is really powerful feature set for writing concurrent code that is portable and exploits hardware parallelism. Being part of standard library, this also encourages async paradigm to be adapted in mainstream programming. Hope to see more migration to C++11 and future standards.
Please feel free to leave your feedback and thanks for reading!

Great articles, first of all Thanks for writing such lovely Post!
ReplyDeletethesimpletruth
Education
This post is much helpful for us. This is really very massive value to all the readers and it will be the only reason for the post to get popular with great authority.
ReplyDeleteDevops Training in Chennai
Devops Certification in Chennai
CCNA Course in Chennai
Cloud Computing Training in Chennai
Data Science Course in Chennai
Devops Training in OMR
Devops Training in Tnagar
Nice Blog. This was a well written blog. Was great reading your blog. Thanks for sharing.
ReplyDeleteSpoken English Classes in Coimbatore
Best Spoken English Classes in Coimbatore
Spoken English Class in Coimbatore
Spoken English in Coimbatore
Coimbatore Spoken English Centre
This was an awesome blog. Thanks for sharing this information with us. Learnt a new information from your blog.
ReplyDeleteSpoken English Classes in Chennai
Best Spoken English Classes in Chennai
Top 10 Spoken English Classes in Chennai
Spoken English Class in Chennai
Spoken English in Chennai
English Classes in Chennai
tuzla daikin klima servisi
ReplyDeleteçekmeköy toshiba klima servisi
ataşehir toshiba klima servisi
tuzla bosch klima servisi
tuzla arçelik klima servisi
çekmeköy samsung klima servisi
çekmeköy mitsubishi klima servisi
ataşehir mitsubishi klima servisi
maltepe vestel klima servisi
Thanks for sharing this post very nice post
ReplyDeleteHanuman Chalisa Lyrics Pdf
Hanuman Chalisa Tamil Pdf
Hanuman Chalisa English Pdf
Hanuman Chalisa Telugu pdf
Very Informative article.
ReplyDeleteKolkata egg rate today