A std::condition_variable in C++ is a synchronization primitive that allows one thread to wait until another thread sends a notification. A waiting thread releases its mutex and sleeps efficiently until notify_one() or notify_all() is called by another thread. Condition variables always work together with a std::unique_lock<std::mutex>: the mutex protects the shared state that the condition is based on, and the condition variable provides the efficient “sleep and wake” mechanism to avoid busy-waiting.
Introduction
Mutexes solve one fundamental concurrency problem: preventing multiple threads from accessing shared data simultaneously. But there is a second equally important problem that mutexes alone cannot solve efficiently: coordination — how does one thread tell another that work is ready, that a condition has been met, or that it should wake up and proceed?
Consider the classic producer-consumer scenario. A producer thread generates data and places it in a shared queue. A consumer thread needs to process items from the queue. The consumer must wait when the queue is empty — but how? It could continuously check the queue in a tight loop (“busy-waiting” or “spinning”), but that wastes CPU cycles and degrades system performance. It could sleep_for a fixed duration, but that adds unnecessary latency when items do arrive.
The correct solution is std::condition_variable — a primitive that lets the consumer thread sleep efficiently while waiting, and lets the producer wake it up immediately when an item is available. The sleeping thread consumes no CPU while waiting and is awakened precisely when the condition it was waiting for becomes true.
Condition variables are used in countless real-world patterns: producer-consumer queues, thread pools, task schedulers, work pipelines, barrier synchronization, and event-driven systems. Understanding them is essential for writing efficient, responsive multithreaded software.
This article teaches condition variables from first principles. You will understand why they are needed, learn the correct usage pattern, implement a production-quality producer-consumer queue, explore the subtleties of spurious wakeups, understand notify_one vs notify_all, and see how C++20’s std::stop_token integrates with condition variables for graceful shutdown.
The Problem: Polling vs. Waiting
Before diving into condition variables, let’s see exactly why the naive approaches to inter-thread coordination are inadequate.
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <chrono>
using namespace std;
mutex mtx;
queue<int> workQueue;
bool done = false;
// BAD APPROACH 1: Busy-waiting (spinning)
void consumer_spinning() {
int processed = 0;
while (true) {
{
lock_guard<mutex> lock(mtx);
if (!workQueue.empty()) {
int item = workQueue.front();
workQueue.pop();
processed++;
cout << "Consumed: " << item << endl;
} else if (done) {
break;
}
}
// No sleep — tight loop burns 100% CPU checking repeatedly
// This is wasteful even on a multi-core machine
}
cout << "Consumer done. Processed: " << processed << endl;
}
// BAD APPROACH 2: Sleeping for a fixed interval
void consumer_sleeping() {
int processed = 0;
while (true) {
{
lock_guard<mutex> lock(mtx);
if (!workQueue.empty()) {
int item = workQueue.front();
workQueue.pop();
processed++;
cout << "Consumed: " << item << endl;
} else if (done) {
break;
}
}
this_thread::sleep_for(chrono::milliseconds(10)); // Wastes latency
// Items may have been available for 9ms before we checked again
}
cout << "Consumer done. Processed: " << processed << endl;
}
// We'll implement the GOOD approach (condition variable) in the next sectionProblems with spinning:
- Burns 100% of one CPU core continuously, even when there is nothing to process
- In systems with many threads, degrades performance for everyone by starving other threads
- Not acceptable in production code except in very specific low-latency scenarios where the wait is expected to be nanoseconds
Problems with fixed-interval sleeping:
- Adds unnecessary latency: if the producer puts an item at time T, the consumer may not see it until T + sleep_interval
- Choosing the right interval is impossible: too short wastes CPU, too long adds latency
- Neither approach scales well as the number of threads grows
The correct solution is a condition variable — the consumer sleeps with zero CPU consumption and is woken immediately when an item arrives.
How Condition Variables Work
A condition variable operates on a simple model:
- A thread checks a condition (e.g., “is the queue non-empty?”)
- If the condition is false, it calls
wait(), which atomically releases the mutex and puts the thread to sleep - Another thread changes the condition (adds an item to the queue), then calls
notify_one()ornotify_all() - The sleeping thread wakes up, re-acquires the mutex, and re-checks the condition
The critical word in step 2 is atomically — the mutex is released and the thread is put to sleep in a single atomic operation. This prevents the “lost wakeup” problem: if the release and sleep were separate steps, a producer could sneak in between them, call notify() before the consumer is asleep, and the consumer would sleep forever waiting for a notification that already happened.
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>
using namespace std;
// The three required ingredients for condition variable usage:
mutex mtx; // 1. Protects the shared state
condition_variable cv; // 2. The notification mechanism
queue<int> workQueue; // 3. The shared state being protected
bool producerDone = false; // Signals that no more items will arrive
void producer(int numItems) {
for (int i = 1; i <= numItems; i++) {
this_thread::sleep_for(chrono::milliseconds(100)); // Simulate work
{
lock_guard<mutex> lock(mtx);
workQueue.push(i);
cout << "Produced: " << i << endl;
}
cv.notify_one(); // Wake one waiting consumer
}
{
lock_guard<mutex> lock(mtx);
producerDone = true;
}
cv.notify_all(); // Wake all consumers to check the done flag
}
void consumer(int id) {
int processed = 0;
while (true) {
unique_lock<mutex> lock(mtx); // unique_lock required by wait()
// wait() atomically: releases lock + sleeps until notified
// The predicate lambda re-checks the condition after wakeup
cv.wait(lock, []() {
return !workQueue.empty() || producerDone;
});
// At this point: lock is re-acquired, condition is true
if (!workQueue.empty()) {
int item = workQueue.front();
workQueue.pop();
lock.unlock(); // Release lock before processing (good practice)
cout << "Consumer " << id << " processed: " << item << endl;
processed++;
} else {
// Queue is empty AND producerDone is true — we're done
break;
}
}
cout << "Consumer " << id << " done. Total processed: " << processed << endl;
}
int main() {
thread prod(producer, 5);
thread cons(consumer, 1);
prod.join();
cons.join();
return 0;
}Output:
Produced: 1
Consumer 1 processed: 1
Produced: 2
Consumer 1 processed: 2
Produced: 3
Consumer 1 processed: 3
Produced: 4
Consumer 1 processed: 4
Produced: 5
Consumer 1 processed: 5
Consumer 1 done. Total processed: 5Step-by-step explanation:
cv.wait(lock, predicate)is the two-argument overload that takes a predicate lambda. It is equivalent to:while (!predicate()) { cv.wait(lock); }It atomically releaseslockand suspends the thread. When notified, it re-acquireslockand evaluates the predicate. If the predicate returnsfalse(spurious wakeup or notification for another reason), it releases the lock and sleeps again. Iftrue, it returns with the lock held.cv.notify_one()after eachworkQueue.push()wakes one waiting consumer. Usinglock_guard(notunique_lock) for the producer is fine — it doesn’t needwait(), only the consumer does.cv.notify_all()whenproducerDone = truewakes all consumers so they can see the flag and exit their loops. Usingnotify_one()here would risk leaving a consumer asleep if there are multiple consumers.lock.unlock()before processing the item is an optimization: we hold the lock only long enough to pop from the queue, then release it so other threads can proceed while we process. Processing often takes longer than queue manipulation, so this reduces lock contention.unique_lock(notlock_guard) is required becausewait()needs to unlock and re-lock the mutex.lock_guarddoes not expose anunlock()method, so it cannot be used withwait().
Spurious Wakeups: Why the Predicate Matters
A spurious wakeup is when a thread returns from wait() even though no notify was called. This sounds impossible but is a real phenomenon — the POSIX threading standard explicitly allows it, and implementations use it for efficiency. A thread may wake up due to OS signals, hardware interrupts, or implementation-specific reasons.
This is precisely why wait() must always use a predicate:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;
mutex mtx;
condition_variable cv;
bool dataReady = false;
int data = 0;
// WRONG: no predicate — spurious wakeups will cause incorrect behavior
void consumer_wrong() {
unique_lock<mutex> lock(mtx);
cv.wait(lock); // Returns on BOTH real notifications AND spurious wakeups
// If this is a spurious wakeup, dataReady is still false
// but we proceed as if data is available — BUG
cout << "Wrong consumer got: " << data << endl;
}
// CORRECT: always use a predicate
void consumer_correct() {
unique_lock<mutex> lock(mtx);
cv.wait(lock, []() { return dataReady; }); // Re-checks after every wakeup
// Only proceeds when dataReady is actually true
cout << "Correct consumer got: " << data << endl;
}
void producer_send() {
this_thread::sleep_for(chrono::milliseconds(100));
{
lock_guard<mutex> lock(mtx);
data = 42;
dataReady = true;
}
cv.notify_one();
}
int main() {
cout << "--- Correct usage ---" << endl;
dataReady = false;
data = 0;
thread prod(producer_send);
thread cons(consumer_correct);
prod.join();
cons.join();
return 0;
}Output:
--- Correct usage ---
Correct consumer got: 42Step-by-step explanation:
cv.wait(lock)(no predicate) returns every time the thread is woken — including spurious wakeups. If there is no re-check, the thread proceeds with incorrect state.cv.wait(lock, predicate)with a lambda correctly handles spurious wakeups: after every wakeup (whether real or spurious), the predicate is evaluated. If false, the thread goes back to sleep. If true, it proceeds.- The predicate captures the condition on the shared state that the thread is waiting for. Always use a predicate that checks the actual state, never just
return trueorreturn notifiedOnce(which would be incorrect). - The shared state variable (
dataReady) must be protected by the same mutex used withwait(). CheckingdataReadyoutside the lock would be a data race. - Rule: Always use the predicate form of
wait(). There is never a good reason to usewait(lock)without a predicate — it is always a bug waiting to happen.
Producer-Consumer Queue: A Production-Quality Implementation
Let’s build a complete, reusable thread-safe queue with condition variable support — the fundamental building block for many concurrent systems.
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <optional>
#include <chrono>
#include <vector>
using namespace std;
template<typename T>
class BlockingQueue {
public:
explicit BlockingQueue(size_t maxSize = SIZE_MAX)
: maxSize_(maxSize), closed_(false) {}
// Push an item — blocks if the queue is full
// Returns false if the queue has been closed
bool push(T item) {
unique_lock<mutex> lock(mtx_);
// Wait while queue is full and not closed
notFull_.wait(lock, [this]() {
return queue_.size() < maxSize_ || closed_;
});
if (closed_) return false;
queue_.push(move(item));
lock.unlock();
notEmpty_.notify_one(); // Wake a waiting consumer
return true;
}
// Pop an item — blocks until an item is available
// Returns empty optional if queue is closed and empty
optional<T> pop() {
unique_lock<mutex> lock(mtx_);
// Wait while queue is empty and not closed
notEmpty_.wait(lock, [this]() {
return !queue_.empty() || closed_;
});
if (queue_.empty()) return nullopt; // Closed and empty
T item = move(queue_.front());
queue_.pop();
lock.unlock();
notFull_.notify_one(); // Wake a waiting producer (bounded queue)
return item;
}
// Try to pop without blocking
// Returns empty optional if queue is empty
optional<T> tryPop() {
lock_guard<mutex> lock(mtx_);
if (queue_.empty()) return nullopt;
T item = move(queue_.front());
queue_.pop();
notFull_.notify_one();
return item;
}
// Signal that no more items will be pushed
void close() {
{
lock_guard<mutex> lock(mtx_);
closed_ = true;
}
notEmpty_.notify_all(); // Wake all consumers to check closed_
notFull_.notify_all(); // Wake all blocked producers
}
size_t size() const {
lock_guard<mutex> lock(mtx_);
return queue_.size();
}
bool isClosed() const {
lock_guard<mutex> lock(mtx_);
return closed_;
}
private:
mutable mutex mtx_;
condition_variable notEmpty_; // Signaled when queue goes from empty to non-empty
condition_variable notFull_; // Signaled when queue goes from full to non-full
queue<T> queue_;
size_t maxSize_;
bool closed_;
};
// ---- Demo: multi-producer, multi-consumer pipeline ----
int main() {
const int NUM_PRODUCERS = 2;
const int NUM_CONSUMERS = 3;
const int ITEMS_PER_PRODUCER = 5;
const int MAX_QUEUE_SIZE = 4; // Bounded queue to demonstrate backpressure
BlockingQueue<int> queue(MAX_QUEUE_SIZE);
// Producers
vector<thread> producers;
for (int p = 0; p < NUM_PRODUCERS; p++) {
producers.emplace_back([p, &queue]() {
for (int i = 0; i < ITEMS_PER_PRODUCER; i++) {
int item = p * 100 + i;
if (queue.push(item)) {
cout << "Producer " << p << " pushed: " << item << endl;
} else {
cout << "Producer " << p << ": queue closed, stopping" << endl;
break;
}
this_thread::sleep_for(chrono::milliseconds(30));
}
cout << "Producer " << p << " done" << endl;
});
}
// Consumers
vector<thread> consumers;
for (int c = 0; c < NUM_CONSUMERS; c++) {
consumers.emplace_back([c, &queue]() {
int count = 0;
while (true) {
auto item = queue.pop();
if (!item.has_value()) {
cout << "Consumer " << c << " done. Processed: " << count << endl;
break;
}
cout << "Consumer " << c << " processed: " << *item << endl;
count++;
this_thread::sleep_for(chrono::milliseconds(50));
}
});
}
// Wait for all producers to finish
for (auto& p : producers) p.join();
cout << "\nAll producers done — closing queue" << endl;
// Signal consumers that no more items are coming
queue.close();
// Wait for all consumers to drain the queue and exit
for (auto& c : consumers) c.join();
cout << "\nAll done. Remaining items: " << queue.size() << endl;
return 0;
}Output (interleaving varies):
Producer 0 pushed: 0
Producer 1 pushed: 100
Consumer 0 processed: 0
Consumer 1 processed: 100
Producer 0 pushed: 1
Producer 1 pushed: 101
Consumer 2 processed: 1
Consumer 0 processed: 101
Producer 0 pushed: 2
Producer 1 pushed: 102
Consumer 1 processed: 2
Consumer 2 processed: 102
Producer 0 pushed: 3
Producer 1 pushed: 103
Consumer 0 processed: 3
Consumer 1 processed: 103
Producer 0 pushed: 4
Producer 1 pushed: 104
Consumer 2 processed: 4
Consumer 0 processed: 104
Producer 0 done
Producer 1 done
All producers done — closing queue
Consumer 1 done. Processed: 4
Consumer 2 done. Processed: 3
Consumer 0 done. Processed: 3
All done. Remaining items: 0Step-by-step explanation:
BlockingQueueuses two condition variables:notEmpty_(signaled when an item is added, wakes consumers) andnotFull_(signaled when an item is removed, wakes producers blocked on a full queue). This is the standard bounded blocking queue pattern.push()waits onnotFull_when the queue is at capacity — the producer blocks until a consumer makes room. After pushing, it signalsnotEmpty_to wake a waiting consumer.pop()waits onnotEmpty_when the queue is empty. After popping, it signalsnotFull_to wake a waiting producer (important for bounded queues).close()setsclosed_ = trueand notifies all waiters on both condition variables. This is the shutdown signal. Producers see it inpush()and returnfalse. Consumers see it inpop()when the queue is empty and returnnullopt.- The
optional<T>return frompop()elegantly distinguishes “got an item” from “queue is closed and empty” — no exceptions needed, no sentinel values. - Every predicate checks both the primary condition and
closed_. This prevents threads from being stuck waiting indefinitely when the queue is closed.
notify_one vs. notify_all
Choosing between notify_one() and notify_all() is important for both correctness and performance.
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
using namespace std;
mutex mtx;
condition_variable cv;
bool eventFired = false;
int waitingCount = 0;
void waiterThread(int id, bool expectMultiple) {
unique_lock<mutex> lock(mtx);
waitingCount++;
cout << "Thread " << id << " waiting..." << endl;
cv.wait(lock, []() { return eventFired; });
cout << "Thread " << id << " woke up!" << endl;
waitingCount--;
}
int main() {
// --- notify_one: wake exactly one waiter ---
cout << "=== notify_one: one waiter wakes ===" << endl;
{
eventFired = false;
waitingCount = 0;
thread t1(waiterThread, 1, false);
thread t2(waiterThread, 2, false);
thread t3(waiterThread, 3, false);
// Wait until all are sleeping
this_thread::sleep_for(chrono::milliseconds(50));
{
lock_guard<mutex> lock(mtx);
eventFired = true;
}
cv.notify_one(); // Wakes exactly ONE of the three
this_thread::sleep_for(chrono::milliseconds(100));
cout << "After notify_one: " << waitingCount << " still waiting" << endl;
// Wake the remaining two
cv.notify_all();
t1.join(); t2.join(); t3.join();
}
cout << "\n=== notify_all: all waiters wake ===" << endl;
{
eventFired = false;
waitingCount = 0;
thread t4(waiterThread, 4, true);
thread t5(waiterThread, 5, true);
thread t6(waiterThread, 6, true);
this_thread::sleep_for(chrono::milliseconds(50));
{
lock_guard<mutex> lock(mtx);
eventFired = true;
}
cv.notify_all(); // Wakes ALL three
t4.join(); t5.join(); t6.join();
cout << "All three woke up" << endl;
}
return 0;
}Output:
=== notify_one: one waiter wakes ===
Thread 1 waiting...
Thread 2 waiting...
Thread 3 waiting...
Thread 1 woke up!
After notify_one: 2 still waiting
Thread 2 woke up!
Thread 3 woke up!
=== notify_all: all waiters wake ===
Thread 4 waiting...
Thread 5 waiting...
Thread 6 waiting...
Thread 4 woke up!
Thread 5 woke up!
Thread 6 woke up!
All three woke upWhen to use each:
notify_one() — Use when exactly one thread can make progress (e.g., consuming one item from a queue). Waking all threads when only one can do work causes a “thundering herd”: all threads wake, compete for the mutex, all but one discover the condition is false, and go back to sleep — wasted context switches and contention.
notify_all() — Use when multiple threads can make progress (e.g., all waiting threads should check a changed flag), when the shared state change could satisfy multiple waiters (e.g., adding multiple items at once), or when signaling shutdown (all threads must wake and exit).
Key rules:
- If all waiting threads wait on the same condition variable but only one can proceed at a time (single-item queue), use
notify_one() - If a state change could allow multiple threads to proceed (batch insert, shutdown flag), use
notify_all() - When in doubt,
notify_all()is safer but less efficient;notify_one()is more efficient but must only be used when at most one thread can make progress
wait_for and wait_until: Timed Waiting
Sometimes a thread should wait for a condition but not forever — it should time out and take action if the condition is not met within a deadline.
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
using namespace std;
mutex mtx;
condition_variable cv;
bool taskComplete = false;
void longRunningTask() {
cout << "Task starting (will take 2 seconds)..." << endl;
this_thread::sleep_for(chrono::seconds(2));
{
lock_guard<mutex> lock(mtx);
taskComplete = true;
}
cv.notify_one();
cout << "Task complete!" << endl;
}
int main() {
thread worker(longRunningTask);
// Wait with a timeout
{
unique_lock<mutex> lock(mtx);
// Wait up to 500ms
auto status = cv.wait_for(lock,
chrono::milliseconds(500),
[]() { return taskComplete; });
if (status) {
cout << "Completed within timeout!" << endl;
} else {
cout << "Timed out after 500ms — task still running" << endl;
}
}
// Wait again with a longer timeout or absolute deadline
{
unique_lock<mutex> lock(mtx);
// wait_until: wait until a specific clock time
auto deadline = chrono::steady_clock::now() + chrono::seconds(3);
auto status = cv.wait_until(lock, deadline,
[]() { return taskComplete; });
if (status) {
cout << "Task completed before deadline!" << endl;
} else {
cout << "Deadline exceeded" << endl;
}
}
worker.join();
return 0;
}Output:
Task starting (will take 2 seconds)...
Timed out after 500ms — task still running
Task complete!
Task completed before deadline!Step-by-step explanation:
cv.wait_for(lock, duration, predicate)waits up todurationfor the predicate to become true. It returnstrueif the predicate became true before the timeout,falseif it timed out.cv.wait_until(lock, time_point, predicate)waits until a specificstd::chrono::time_point. Useful when you have an absolute deadline computed from a wall clock or when you need to check at a specific time.- Both forms return
cv_status::timeoutorcv_status::no_timeoutwhen used without a predicate, or aboolwhen used with a predicate —truemeans the predicate became true,falsemeans timeout. - Timed waits are important for resilient systems: network operations that should time out, health checks that should not block indefinitely, and real-time systems that require bounded response times.
- Always combine timed waits with predicates to handle spurious wakeups correctly, just like regular
wait().
Real-World Pattern: Thread Pool with Work Queue
A thread pool is one of the most common and important concurrent patterns. It creates a fixed number of worker threads that pick up tasks from a shared work queue — avoiding the overhead of creating and destroying threads for each task.
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <vector>
#include <future>
#include <atomic>
using namespace std;
class ThreadPool {
public:
explicit ThreadPool(size_t numThreads) : stopped_(false) {
for (size_t i = 0; i < numThreads; i++) {
workers_.emplace_back([this, i]() {
workerLoop(i);
});
}
cout << "ThreadPool started with " << numThreads << " workers" << endl;
}
~ThreadPool() {
shutdown();
}
// Submit a task — returns a future for the result
template<typename F, typename... Args>
auto submit(F&& func, Args&&... args)
-> future<invoke_result_t<F, Args...>>
{
using ReturnType = invoke_result_t<F, Args...>;
// Package the task as a promise/future pair
auto task = make_shared<packaged_task<ReturnType()>>(
bind(forward<F>(func), forward<Args>(args)...)
);
future<ReturnType> result = task->get_future();
{
lock_guard<mutex> lock(mtx_);
if (stopped_) throw runtime_error("ThreadPool is stopped");
taskQueue_.push([task]() { (*task)(); });
}
cv_.notify_one(); // Wake one worker
return result;
}
void shutdown() {
{
lock_guard<mutex> lock(mtx_);
if (stopped_) return;
stopped_ = true;
}
cv_.notify_all(); // Wake all workers so they can exit
for (thread& w : workers_) {
if (w.joinable()) w.join();
}
cout << "ThreadPool shut down" << endl;
}
size_t pendingTasks() const {
lock_guard<mutex> lock(mtx_);
return taskQueue_.size();
}
private:
void workerLoop(size_t id) {
while (true) {
function<void()> task;
{
unique_lock<mutex> lock(mtx_);
cv_.wait(lock, [this]() {
return !taskQueue_.empty() || stopped_;
});
if (stopped_ && taskQueue_.empty()) {
cout << "Worker " << id << " exiting" << endl;
return;
}
task = move(taskQueue_.front());
taskQueue_.pop();
}
// Execute task outside the lock
task();
}
}
mutable mutex mtx_;
condition_variable cv_;
queue<function<void()>> taskQueue_;
vector<thread> workers_;
bool stopped_;
};
// Example task functions
int computeSquare(int n) {
this_thread::sleep_for(chrono::milliseconds(50)); // Simulate work
return n * n;
}
string reverseString(string s) {
this_thread::sleep_for(chrono::milliseconds(30));
reverse(s.begin(), s.end());
return s;
}
int main() {
ThreadPool pool(3); // 3 worker threads
cout << "\n--- Submitting tasks ---" << endl;
// Submit tasks and get futures
vector<future<int>> intResults;
for (int i = 1; i <= 6; i++) {
intResults.push_back(pool.submit(computeSquare, i));
}
vector<future<string>> strResults;
strResults.push_back(pool.submit(reverseString, string("hello")));
strResults.push_back(pool.submit(reverseString, string("world")));
cout << "\n--- Collecting results ---" << endl;
for (int i = 0; i < 6; i++) {
cout << (i+1) << "^2 = " << intResults[i].get() << endl;
}
for (auto& f : strResults) {
cout << "Reversed: " << f.get() << endl;
}
cout << "\n--- Shutting down ---" << endl;
return 0; // Destructor calls shutdown()
}Output:
ThreadPool started with 3 workers
--- Submitting tasks ---
--- Collecting results ---
1^2 = 1
2^2 = 4
3^2 = 9
4^2 = 16
5^2 = 25
6^2 = 36
Reversed: olleh
Reversed: dlrow
--- Shutting down ---
Worker 0 exiting
Worker 1 exiting
Worker 2 exiting
ThreadPool shut downStep-by-step explanation:
- The thread pool creates
numThreadsworker threads in its constructor. Each runsworkerLoop(), which waits oncv_for tasks to appear intaskQueue_. submit()wraps the callable in astd::packaged_task, which connects a callable to astd::futurefor result retrieval. The task is pushed to the queue andcv_.notify_one()wakes one worker.- Workers execute tasks outside the mutex lock — the lock is only held briefly to dequeue the task. This is critical: if tasks were executed while holding the lock, no other worker could dequeue tasks concurrently.
shutdown()setsstopped_ = trueand callscv_.notify_all(). Workers checkstopped_ && taskQueue_.empty()— if true, they exit. Tasks already in the queue are still processed (note:stopped_being true doesn’t mean the queue is empty).future.get()in the caller blocks until the result is available. Multiple futures can be awaited concurrently — each worker independently produces its result, andget()retrieves them in any order you choose.
Condition Variables with std::stop_token (C++20)
C++20’s std::condition_variable_any works with std::stop_token to enable clean cancellation of waiting threads without polling or shared flags.
#include <iostream>
#include <thread>
#include <stop_token>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;
mutex mtx;
condition_variable_any cv; // condition_variable_any works with stop_token
queue<int> workQueue;
void worker(stop_token stopToken, int id) {
while (true) {
unique_lock<mutex> lock(mtx);
// wait() overload that checks stop_token — exits immediately on stop request
bool hasWork = cv.wait(lock, stopToken, []() {
return !workQueue.empty();
});
if (!hasWork) {
// stop was requested and queue is empty (or just stop was requested)
cout << "Worker " << id << ": stop requested, exiting" << endl;
return;
}
int item = workQueue.front();
workQueue.pop();
lock.unlock();
cout << "Worker " << id << " processed: " << item << endl;
this_thread::sleep_for(chrono::milliseconds(50));
}
}
int main() {
jthread w1(worker, 1); // jthread provides automatic stop_token
jthread w2(worker, 2);
// Push some work
for (int i = 1; i <= 6; i++) {
{
lock_guard<mutex> lock(mtx);
workQueue.push(i);
}
cv.notify_one();
this_thread::sleep_for(chrono::milliseconds(30));
}
// Give workers time to process
this_thread::sleep_for(chrono::milliseconds(400));
cout << "\nRequesting stop..." << endl;
w1.request_stop(); // Signals stop_token — cv.wait returns false immediately
w2.request_stop();
// jthread destructor auto-joins
return 0;
}Output:
Worker 1 processed: 1
Worker 2 processed: 2
Worker 1 processed: 3
Worker 2 processed: 4
Worker 1 processed: 5
Worker 2 processed: 6
Requesting stop...
Worker 1: stop requested, exiting
Worker 2: stop requested, exitingStep-by-step explanation:
condition_variable_any(vs.condition_variable) can work with any lockable type, includingjthread‘s internal stop state. It adds await(lock, stop_token, predicate)overload.- When
stopis requested (viarequest_stop()orjthreaddestructor), thewait(lock, stopToken, predicate)overload wakes the thread even if the predicate is false, and returnsfalse. The worker checks the return value and exits cleanly. - This eliminates the need for a separate
closed_flag, making the shutdown mechanism cleaner and integrating directly with C++20’s cooperative cancellation model. jthreadautomatically passes the stop token as the first argument to the thread function (when the callable acceptsstop_tokenas the first parameter) and callsrequest_stop()in its destructor before joining.
Common Mistakes
Mistake 1: Using wait() without a predicate.
cv.wait(lock); // WRONG — spurious wakeups will cause bugs
cv.wait(lock, []() { return conditionIsTrue; }); // CORRECTMistake 2: Notifying before the waiter is sleeping.
// WRONG order: notify before the waiter starts waiting — lost wakeup
cv.notify_one(); // No one is waiting yet
// ... then elsewhere: cv.wait(lock, pred); // Might miss the notification
// Fix: always ensure the predicate captures state, not just a signal
// The predicate will be checked immediately before waiting — if already true, no sleepMistake 3: Using lock_guard with wait().
lock_guard<mutex> lock(mtx);
cv.wait(lock, pred); // COMPILE ERROR: lock_guard has no unlock()
unique_lock<mutex> lock(mtx); // CORRECT
cv.wait(lock, pred);Mistake 4: Calling notify while holding the lock (minor issue).
// Technically correct but may cause unnecessary context switches:
{
lock_guard<mutex> lock(mtx);
sharedData = newValue;
cv.notify_one(); // Notifies while holding lock — woken thread immediately blocks
}
// Slightly better:
{
lock_guard<mutex> lock(mtx);
sharedData = newValue;
}
cv.notify_one(); // Notifies after releasing lock — woken thread can proceed immediatelyMistake 5: Using notify_one when multiple threads can proceed.
// Added 10 items to queue but only notify_one — 9 consumers stay asleep
for (int i = 0; i < 10; i++) queue.push(i);
cv.notify_one(); // WRONG: only one consumer wakes, 9 items wait
cv.notify_all(); // CORRECT: all consumers can wake and process
// Or notify once per item if items are pushed in a loopCondition Variable Quick Reference
| Method | Description |
|---|---|
cv.wait(lock, pred) | Sleep until pred() returns true; re-checks on every wakeup |
cv.wait(lock) | Sleep until notified; always spurious-wakeup prone — use pred |
cv.wait_for(lock, dur, pred) | Sleep until pred true or duration expires; returns bool |
cv.wait_until(lock, tp, pred) | Sleep until pred true or time_point reached; returns bool |
cv.notify_one() | Wake one waiting thread |
cv.notify_all() | Wake all waiting threads |
condition_variable_any | Like cv but works with any lockable + stop_token |
Conclusion
Condition variables are the bridge between mutex-based data protection and inter-thread coordination. They solve the fundamental problem of efficient waiting: instead of burning CPU cycles polling for a condition or sleeping for fixed intervals with unpredictable latency, threads sleep with zero CPU overhead and are woken precisely when the condition they need becomes true.
The three-part pattern — shared state protected by a mutex, a condition variable for efficient waiting, and a predicate that captures the exact condition being waited for — is the foundation of every coordination pattern in multithreaded C++: producer-consumer queues, thread pools, pipeline stages, barrier synchronization, and event-driven systems.
The rules are simple but absolute: always use unique_lock with wait(), always use a predicate to guard against spurious wakeups, choose notify_one() when exactly one waiter can proceed and notify_all() when multiple can, and always update the shared state before calling notify.
C++20 extends this model with condition_variable_any and stop_token integration, making cooperative cancellation clean and idiomatic. Combined with jthread‘s automatic stop management, modern C++ makes thread coordination both powerful and safe.
With mutexes (from the previous article) and condition variables (from this one), you have the complete toolkit for building any concurrent C++ system — from simple protected counters to sophisticated work-stealing thread pools.








