Condition Variables: Thread Communication

Master C++ condition variables — learn how threads wait and notify each other, implement producer-consumer queues, avoid spurious wakeups, and use condition_variable_any.

Condition Variables: Thread Communication

A std::condition_variable in C++ is a synchronization primitive that allows one thread to wait until another thread sends a notification. A waiting thread releases its mutex and sleeps efficiently until notify_one() or notify_all() is called by another thread. Condition variables always work together with a std::unique_lock<std::mutex>: the mutex protects the shared state that the condition is based on, and the condition variable provides the efficient “sleep and wake” mechanism to avoid busy-waiting.

Introduction

Mutexes solve one fundamental concurrency problem: preventing multiple threads from accessing shared data simultaneously. But there is a second equally important problem that mutexes alone cannot solve efficiently: coordination — how does one thread tell another that work is ready, that a condition has been met, or that it should wake up and proceed?

Consider the classic producer-consumer scenario. A producer thread generates data and places it in a shared queue. A consumer thread needs to process items from the queue. The consumer must wait when the queue is empty — but how? It could continuously check the queue in a tight loop (“busy-waiting” or “spinning”), but that wastes CPU cycles and degrades system performance. It could sleep_for a fixed duration, but that adds unnecessary latency when items do arrive.

The correct solution is std::condition_variable — a primitive that lets the consumer thread sleep efficiently while waiting, and lets the producer wake it up immediately when an item is available. The sleeping thread consumes no CPU while waiting and is awakened precisely when the condition it was waiting for becomes true.

Condition variables are used in countless real-world patterns: producer-consumer queues, thread pools, task schedulers, work pipelines, barrier synchronization, and event-driven systems. Understanding them is essential for writing efficient, responsive multithreaded software.

This article teaches condition variables from first principles. You will understand why they are needed, learn the correct usage pattern, implement a production-quality producer-consumer queue, explore the subtleties of spurious wakeups, understand notify_one vs notify_all, and see how C++20’s std::stop_token integrates with condition variables for graceful shutdown.

The Problem: Polling vs. Waiting

Before diving into condition variables, let’s see exactly why the naive approaches to inter-thread coordination are inadequate.

C++
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <chrono>
using namespace std;

mutex mtx;
queue<int> workQueue;
bool done = false;

// BAD APPROACH 1: Busy-waiting (spinning)
void consumer_spinning() {
    int processed = 0;
    while (true) {
        {
            lock_guard<mutex> lock(mtx);
            if (!workQueue.empty()) {
                int item = workQueue.front();
                workQueue.pop();
                processed++;
                cout << "Consumed: " << item << endl;
            } else if (done) {
                break;
            }
        }
        // No sleep — tight loop burns 100% CPU checking repeatedly
        // This is wasteful even on a multi-core machine
    }
    cout << "Consumer done. Processed: " << processed << endl;
}

// BAD APPROACH 2: Sleeping for a fixed interval
void consumer_sleeping() {
    int processed = 0;
    while (true) {
        {
            lock_guard<mutex> lock(mtx);
            if (!workQueue.empty()) {
                int item = workQueue.front();
                workQueue.pop();
                processed++;
                cout << "Consumed: " << item << endl;
            } else if (done) {
                break;
            }
        }
        this_thread::sleep_for(chrono::milliseconds(10)); // Wastes latency
        // Items may have been available for 9ms before we checked again
    }
    cout << "Consumer done. Processed: " << processed << endl;
}

// We'll implement the GOOD approach (condition variable) in the next section

Problems with spinning:

  • Burns 100% of one CPU core continuously, even when there is nothing to process
  • In systems with many threads, degrades performance for everyone by starving other threads
  • Not acceptable in production code except in very specific low-latency scenarios where the wait is expected to be nanoseconds

Problems with fixed-interval sleeping:

  • Adds unnecessary latency: if the producer puts an item at time T, the consumer may not see it until T + sleep_interval
  • Choosing the right interval is impossible: too short wastes CPU, too long adds latency
  • Neither approach scales well as the number of threads grows

The correct solution is a condition variable — the consumer sleeps with zero CPU consumption and is woken immediately when an item arrives.

How Condition Variables Work

A condition variable operates on a simple model:

  1. A thread checks a condition (e.g., “is the queue non-empty?”)
  2. If the condition is false, it calls wait(), which atomically releases the mutex and puts the thread to sleep
  3. Another thread changes the condition (adds an item to the queue), then calls notify_one() or notify_all()
  4. The sleeping thread wakes up, re-acquires the mutex, and re-checks the condition

The critical word in step 2 is atomically — the mutex is released and the thread is put to sleep in a single atomic operation. This prevents the “lost wakeup” problem: if the release and sleep were separate steps, a producer could sneak in between them, call notify() before the consumer is asleep, and the consumer would sleep forever waiting for a notification that already happened.

C++
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <chrono>
using namespace std;

// The three required ingredients for condition variable usage:
mutex mtx;                    // 1. Protects the shared state
condition_variable cv;        // 2. The notification mechanism
queue<int> workQueue;         // 3. The shared state being protected

bool producerDone = false;    // Signals that no more items will arrive

void producer(int numItems) {
    for (int i = 1; i <= numItems; i++) {
        this_thread::sleep_for(chrono::milliseconds(100)); // Simulate work

        {
            lock_guard<mutex> lock(mtx);
            workQueue.push(i);
            cout << "Produced: " << i << endl;
        }
        cv.notify_one(); // Wake one waiting consumer
    }

    {
        lock_guard<mutex> lock(mtx);
        producerDone = true;
    }
    cv.notify_all(); // Wake all consumers to check the done flag
}

void consumer(int id) {
    int processed = 0;

    while (true) {
        unique_lock<mutex> lock(mtx); // unique_lock required by wait()

        // wait() atomically: releases lock + sleeps until notified
        // The predicate lambda re-checks the condition after wakeup
        cv.wait(lock, []() {
            return !workQueue.empty() || producerDone;
        });

        // At this point: lock is re-acquired, condition is true
        if (!workQueue.empty()) {
            int item = workQueue.front();
            workQueue.pop();
            lock.unlock(); // Release lock before processing (good practice)

            cout << "Consumer " << id << " processed: " << item << endl;
            processed++;
        } else {
            // Queue is empty AND producerDone is true — we're done
            break;
        }
    }

    cout << "Consumer " << id << " done. Total processed: " << processed << endl;
}

int main() {
    thread prod(producer, 5);
    thread cons(consumer, 1);

    prod.join();
    cons.join();

    return 0;
}

Output:

Plaintext
Produced: 1
Consumer 1 processed: 1
Produced: 2
Consumer 1 processed: 2
Produced: 3
Consumer 1 processed: 3
Produced: 4
Consumer 1 processed: 4
Produced: 5
Consumer 1 processed: 5
Consumer 1 done. Total processed: 5

Step-by-step explanation:

  1. cv.wait(lock, predicate) is the two-argument overload that takes a predicate lambda. It is equivalent to: while (!predicate()) { cv.wait(lock); } It atomically releases lock and suspends the thread. When notified, it re-acquires lock and evaluates the predicate. If the predicate returns false (spurious wakeup or notification for another reason), it releases the lock and sleeps again. If true, it returns with the lock held.
  2. cv.notify_one() after each workQueue.push() wakes one waiting consumer. Using lock_guard (not unique_lock) for the producer is fine — it doesn’t need wait(), only the consumer does.
  3. cv.notify_all() when producerDone = true wakes all consumers so they can see the flag and exit their loops. Using notify_one() here would risk leaving a consumer asleep if there are multiple consumers.
  4. lock.unlock() before processing the item is an optimization: we hold the lock only long enough to pop from the queue, then release it so other threads can proceed while we process. Processing often takes longer than queue manipulation, so this reduces lock contention.
  5. unique_lock (not lock_guard) is required because wait() needs to unlock and re-lock the mutex. lock_guard does not expose an unlock() method, so it cannot be used with wait().

Spurious Wakeups: Why the Predicate Matters

A spurious wakeup is when a thread returns from wait() even though no notify was called. This sounds impossible but is a real phenomenon — the POSIX threading standard explicitly allows it, and implementations use it for efficiency. A thread may wake up due to OS signals, hardware interrupts, or implementation-specific reasons.

This is precisely why wait() must always use a predicate:

C++
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;

mutex mtx;
condition_variable cv;
bool dataReady = false;
int  data      = 0;

// WRONG: no predicate — spurious wakeups will cause incorrect behavior
void consumer_wrong() {
    unique_lock<mutex> lock(mtx);
    cv.wait(lock);          // Returns on BOTH real notifications AND spurious wakeups
    // If this is a spurious wakeup, dataReady is still false
    // but we proceed as if data is available — BUG
    cout << "Wrong consumer got: " << data << endl;
}

// CORRECT: always use a predicate
void consumer_correct() {
    unique_lock<mutex> lock(mtx);
    cv.wait(lock, []() { return dataReady; }); // Re-checks after every wakeup
    // Only proceeds when dataReady is actually true
    cout << "Correct consumer got: " << data << endl;
}

void producer_send() {
    this_thread::sleep_for(chrono::milliseconds(100));
    {
        lock_guard<mutex> lock(mtx);
        data      = 42;
        dataReady = true;
    }
    cv.notify_one();
}

int main() {
    cout << "--- Correct usage ---" << endl;
    dataReady = false;
    data      = 0;

    thread prod(producer_send);
    thread cons(consumer_correct);

    prod.join();
    cons.join();

    return 0;
}

Output:

Plaintext
--- Correct usage ---
Correct consumer got: 42

Step-by-step explanation:

  1. cv.wait(lock) (no predicate) returns every time the thread is woken — including spurious wakeups. If there is no re-check, the thread proceeds with incorrect state.
  2. cv.wait(lock, predicate) with a lambda correctly handles spurious wakeups: after every wakeup (whether real or spurious), the predicate is evaluated. If false, the thread goes back to sleep. If true, it proceeds.
  3. The predicate captures the condition on the shared state that the thread is waiting for. Always use a predicate that checks the actual state, never just return true or return notifiedOnce (which would be incorrect).
  4. The shared state variable (dataReady) must be protected by the same mutex used with wait(). Checking dataReady outside the lock would be a data race.
  5. Rule: Always use the predicate form of wait(). There is never a good reason to use wait(lock) without a predicate — it is always a bug waiting to happen.

Producer-Consumer Queue: A Production-Quality Implementation

Let’s build a complete, reusable thread-safe queue with condition variable support — the fundamental building block for many concurrent systems.

C++
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <optional>
#include <chrono>
#include <vector>
using namespace std;

template<typename T>
class BlockingQueue {
public:
    explicit BlockingQueue(size_t maxSize = SIZE_MAX)
        : maxSize_(maxSize), closed_(false) {}

    // Push an item — blocks if the queue is full
    // Returns false if the queue has been closed
    bool push(T item) {
        unique_lock<mutex> lock(mtx_);

        // Wait while queue is full and not closed
        notFull_.wait(lock, [this]() {
            return queue_.size() < maxSize_ || closed_;
        });

        if (closed_) return false;

        queue_.push(move(item));
        lock.unlock();
        notEmpty_.notify_one(); // Wake a waiting consumer
        return true;
    }

    // Pop an item — blocks until an item is available
    // Returns empty optional if queue is closed and empty
    optional<T> pop() {
        unique_lock<mutex> lock(mtx_);

        // Wait while queue is empty and not closed
        notEmpty_.wait(lock, [this]() {
            return !queue_.empty() || closed_;
        });

        if (queue_.empty()) return nullopt; // Closed and empty

        T item = move(queue_.front());
        queue_.pop();
        lock.unlock();
        notFull_.notify_one(); // Wake a waiting producer (bounded queue)
        return item;
    }

    // Try to pop without blocking
    // Returns empty optional if queue is empty
    optional<T> tryPop() {
        lock_guard<mutex> lock(mtx_);
        if (queue_.empty()) return nullopt;
        T item = move(queue_.front());
        queue_.pop();
        notFull_.notify_one();
        return item;
    }

    // Signal that no more items will be pushed
    void close() {
        {
            lock_guard<mutex> lock(mtx_);
            closed_ = true;
        }
        notEmpty_.notify_all(); // Wake all consumers to check closed_
        notFull_.notify_all();  // Wake all blocked producers
    }

    size_t size() const {
        lock_guard<mutex> lock(mtx_);
        return queue_.size();
    }

    bool isClosed() const {
        lock_guard<mutex> lock(mtx_);
        return closed_;
    }

private:
    mutable mutex       mtx_;
    condition_variable  notEmpty_; // Signaled when queue goes from empty to non-empty
    condition_variable  notFull_;  // Signaled when queue goes from full to non-full
    queue<T>            queue_;
    size_t              maxSize_;
    bool                closed_;
};

// ---- Demo: multi-producer, multi-consumer pipeline ----

int main() {
    const int NUM_PRODUCERS = 2;
    const int NUM_CONSUMERS = 3;
    const int ITEMS_PER_PRODUCER = 5;
    const int MAX_QUEUE_SIZE = 4; // Bounded queue to demonstrate backpressure

    BlockingQueue<int> queue(MAX_QUEUE_SIZE);

    // Producers
    vector<thread> producers;
    for (int p = 0; p < NUM_PRODUCERS; p++) {
        producers.emplace_back([p, &queue]() {
            for (int i = 0; i < ITEMS_PER_PRODUCER; i++) {
                int item = p * 100 + i;
                if (queue.push(item)) {
                    cout << "Producer " << p << " pushed: " << item << endl;
                } else {
                    cout << "Producer " << p << ": queue closed, stopping" << endl;
                    break;
                }
                this_thread::sleep_for(chrono::milliseconds(30));
            }
            cout << "Producer " << p << " done" << endl;
        });
    }

    // Consumers
    vector<thread> consumers;
    for (int c = 0; c < NUM_CONSUMERS; c++) {
        consumers.emplace_back([c, &queue]() {
            int count = 0;
            while (true) {
                auto item = queue.pop();
                if (!item.has_value()) {
                    cout << "Consumer " << c << " done. Processed: " << count << endl;
                    break;
                }
                cout << "Consumer " << c << " processed: " << *item << endl;
                count++;
                this_thread::sleep_for(chrono::milliseconds(50));
            }
        });
    }

    // Wait for all producers to finish
    for (auto& p : producers) p.join();
    cout << "\nAll producers done — closing queue" << endl;

    // Signal consumers that no more items are coming
    queue.close();

    // Wait for all consumers to drain the queue and exit
    for (auto& c : consumers) c.join();

    cout << "\nAll done. Remaining items: " << queue.size() << endl;

    return 0;
}

Output (interleaving varies):

Plaintext
Producer 0 pushed: 0
Producer 1 pushed: 100
Consumer 0 processed: 0
Consumer 1 processed: 100
Producer 0 pushed: 1
Producer 1 pushed: 101
Consumer 2 processed: 1
Consumer 0 processed: 101
Producer 0 pushed: 2
Producer 1 pushed: 102
Consumer 1 processed: 2
Consumer 2 processed: 102
Producer 0 pushed: 3
Producer 1 pushed: 103
Consumer 0 processed: 3
Consumer 1 processed: 103
Producer 0 pushed: 4
Producer 1 pushed: 104
Consumer 2 processed: 4
Consumer 0 processed: 104
Producer 0 done
Producer 1 done

All producers done — closing queue
Consumer 1 done. Processed: 4
Consumer 2 done. Processed: 3
Consumer 0 done. Processed: 3

All done. Remaining items: 0

Step-by-step explanation:

  1. BlockingQueue uses two condition variables: notEmpty_ (signaled when an item is added, wakes consumers) and notFull_ (signaled when an item is removed, wakes producers blocked on a full queue). This is the standard bounded blocking queue pattern.
  2. push() waits on notFull_ when the queue is at capacity — the producer blocks until a consumer makes room. After pushing, it signals notEmpty_ to wake a waiting consumer.
  3. pop() waits on notEmpty_ when the queue is empty. After popping, it signals notFull_ to wake a waiting producer (important for bounded queues).
  4. close() sets closed_ = true and notifies all waiters on both condition variables. This is the shutdown signal. Producers see it in push() and return false. Consumers see it in pop() when the queue is empty and return nullopt.
  5. The optional<T> return from pop() elegantly distinguishes “got an item” from “queue is closed and empty” — no exceptions needed, no sentinel values.
  6. Every predicate checks both the primary condition and closed_. This prevents threads from being stuck waiting indefinitely when the queue is closed.

notify_one vs. notify_all

Choosing between notify_one() and notify_all() is important for both correctness and performance.

C++
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
using namespace std;

mutex mtx;
condition_variable cv;
bool eventFired = false;
int  waitingCount = 0;

void waiterThread(int id, bool expectMultiple) {
    unique_lock<mutex> lock(mtx);
    waitingCount++;
    cout << "Thread " << id << " waiting..." << endl;

    cv.wait(lock, []() { return eventFired; });

    cout << "Thread " << id << " woke up!" << endl;
    waitingCount--;
}

int main() {
    // --- notify_one: wake exactly one waiter ---
    cout << "=== notify_one: one waiter wakes ===" << endl;
    {
        eventFired = false;
        waitingCount = 0;

        thread t1(waiterThread, 1, false);
        thread t2(waiterThread, 2, false);
        thread t3(waiterThread, 3, false);

        // Wait until all are sleeping
        this_thread::sleep_for(chrono::milliseconds(50));

        {
            lock_guard<mutex> lock(mtx);
            eventFired = true;
        }
        cv.notify_one(); // Wakes exactly ONE of the three

        this_thread::sleep_for(chrono::milliseconds(100));
        cout << "After notify_one: " << waitingCount << " still waiting" << endl;

        // Wake the remaining two
        cv.notify_all();
        t1.join(); t2.join(); t3.join();
    }

    cout << "\n=== notify_all: all waiters wake ===" << endl;
    {
        eventFired = false;
        waitingCount = 0;

        thread t4(waiterThread, 4, true);
        thread t5(waiterThread, 5, true);
        thread t6(waiterThread, 6, true);

        this_thread::sleep_for(chrono::milliseconds(50));

        {
            lock_guard<mutex> lock(mtx);
            eventFired = true;
        }
        cv.notify_all(); // Wakes ALL three

        t4.join(); t5.join(); t6.join();
        cout << "All three woke up" << endl;
    }

    return 0;
}

Output:

Plaintext
=== notify_one: one waiter wakes ===
Thread 1 waiting...
Thread 2 waiting...
Thread 3 waiting...
Thread 1 woke up!
After notify_one: 2 still waiting
Thread 2 woke up!
Thread 3 woke up!

=== notify_all: all waiters wake ===
Thread 4 waiting...
Thread 5 waiting...
Thread 6 waiting...
Thread 4 woke up!
Thread 5 woke up!
Thread 6 woke up!
All three woke up

When to use each:

notify_one() — Use when exactly one thread can make progress (e.g., consuming one item from a queue). Waking all threads when only one can do work causes a “thundering herd”: all threads wake, compete for the mutex, all but one discover the condition is false, and go back to sleep — wasted context switches and contention.

notify_all() — Use when multiple threads can make progress (e.g., all waiting threads should check a changed flag), when the shared state change could satisfy multiple waiters (e.g., adding multiple items at once), or when signaling shutdown (all threads must wake and exit).

Key rules:

  • If all waiting threads wait on the same condition variable but only one can proceed at a time (single-item queue), use notify_one()
  • If a state change could allow multiple threads to proceed (batch insert, shutdown flag), use notify_all()
  • When in doubt, notify_all() is safer but less efficient; notify_one() is more efficient but must only be used when at most one thread can make progress

wait_for and wait_until: Timed Waiting

Sometimes a thread should wait for a condition but not forever — it should time out and take action if the condition is not met within a deadline.

C++
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
using namespace std;

mutex mtx;
condition_variable cv;
bool taskComplete = false;

void longRunningTask() {
    cout << "Task starting (will take 2 seconds)..." << endl;
    this_thread::sleep_for(chrono::seconds(2));
    {
        lock_guard<mutex> lock(mtx);
        taskComplete = true;
    }
    cv.notify_one();
    cout << "Task complete!" << endl;
}

int main() {
    thread worker(longRunningTask);

    // Wait with a timeout
    {
        unique_lock<mutex> lock(mtx);

        // Wait up to 500ms
        auto status = cv.wait_for(lock,
                                  chrono::milliseconds(500),
                                  []() { return taskComplete; });

        if (status) {
            cout << "Completed within timeout!" << endl;
        } else {
            cout << "Timed out after 500ms — task still running" << endl;
        }
    }

    // Wait again with a longer timeout or absolute deadline
    {
        unique_lock<mutex> lock(mtx);

        // wait_until: wait until a specific clock time
        auto deadline = chrono::steady_clock::now() + chrono::seconds(3);
        auto status = cv.wait_until(lock, deadline,
                                    []() { return taskComplete; });

        if (status) {
            cout << "Task completed before deadline!" << endl;
        } else {
            cout << "Deadline exceeded" << endl;
        }
    }

    worker.join();

    return 0;
}

Output:

Plaintext
Task starting (will take 2 seconds)...
Timed out after 500ms — task still running
Task complete!
Task completed before deadline!

Step-by-step explanation:

  1. cv.wait_for(lock, duration, predicate) waits up to duration for the predicate to become true. It returns true if the predicate became true before the timeout, false if it timed out.
  2. cv.wait_until(lock, time_point, predicate) waits until a specific std::chrono::time_point. Useful when you have an absolute deadline computed from a wall clock or when you need to check at a specific time.
  3. Both forms return cv_status::timeout or cv_status::no_timeout when used without a predicate, or a bool when used with a predicate — true means the predicate became true, false means timeout.
  4. Timed waits are important for resilient systems: network operations that should time out, health checks that should not block indefinitely, and real-time systems that require bounded response times.
  5. Always combine timed waits with predicates to handle spurious wakeups correctly, just like regular wait().

Real-World Pattern: Thread Pool with Work Queue

A thread pool is one of the most common and important concurrent patterns. It creates a fixed number of worker threads that pick up tasks from a shared work queue — avoiding the overhead of creating and destroying threads for each task.

C++
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <vector>
#include <future>
#include <atomic>
using namespace std;

class ThreadPool {
public:
    explicit ThreadPool(size_t numThreads) : stopped_(false) {
        for (size_t i = 0; i < numThreads; i++) {
            workers_.emplace_back([this, i]() {
                workerLoop(i);
            });
        }
        cout << "ThreadPool started with " << numThreads << " workers" << endl;
    }

    ~ThreadPool() {
        shutdown();
    }

    // Submit a task — returns a future for the result
    template<typename F, typename... Args>
    auto submit(F&& func, Args&&... args)
        -> future<invoke_result_t<F, Args...>>
    {
        using ReturnType = invoke_result_t<F, Args...>;

        // Package the task as a promise/future pair
        auto task = make_shared<packaged_task<ReturnType()>>(
            bind(forward<F>(func), forward<Args>(args)...)
        );

        future<ReturnType> result = task->get_future();

        {
            lock_guard<mutex> lock(mtx_);
            if (stopped_) throw runtime_error("ThreadPool is stopped");
            taskQueue_.push([task]() { (*task)(); });
        }
        cv_.notify_one(); // Wake one worker

        return result;
    }

    void shutdown() {
        {
            lock_guard<mutex> lock(mtx_);
            if (stopped_) return;
            stopped_ = true;
        }
        cv_.notify_all(); // Wake all workers so they can exit

        for (thread& w : workers_) {
            if (w.joinable()) w.join();
        }
        cout << "ThreadPool shut down" << endl;
    }

    size_t pendingTasks() const {
        lock_guard<mutex> lock(mtx_);
        return taskQueue_.size();
    }

private:
    void workerLoop(size_t id) {
        while (true) {
            function<void()> task;

            {
                unique_lock<mutex> lock(mtx_);
                cv_.wait(lock, [this]() {
                    return !taskQueue_.empty() || stopped_;
                });

                if (stopped_ && taskQueue_.empty()) {
                    cout << "Worker " << id << " exiting" << endl;
                    return;
                }

                task = move(taskQueue_.front());
                taskQueue_.pop();
            }

            // Execute task outside the lock
            task();
        }
    }

    mutable mutex             mtx_;
    condition_variable        cv_;
    queue<function<void()>>   taskQueue_;
    vector<thread>            workers_;
    bool                      stopped_;
};

// Example task functions
int computeSquare(int n) {
    this_thread::sleep_for(chrono::milliseconds(50)); // Simulate work
    return n * n;
}

string reverseString(string s) {
    this_thread::sleep_for(chrono::milliseconds(30));
    reverse(s.begin(), s.end());
    return s;
}

int main() {
    ThreadPool pool(3); // 3 worker threads

    cout << "\n--- Submitting tasks ---" << endl;

    // Submit tasks and get futures
    vector<future<int>> intResults;
    for (int i = 1; i <= 6; i++) {
        intResults.push_back(pool.submit(computeSquare, i));
    }

    vector<future<string>> strResults;
    strResults.push_back(pool.submit(reverseString, string("hello")));
    strResults.push_back(pool.submit(reverseString, string("world")));

    cout << "\n--- Collecting results ---" << endl;
    for (int i = 0; i < 6; i++) {
        cout << (i+1) << "^2 = " << intResults[i].get() << endl;
    }
    for (auto& f : strResults) {
        cout << "Reversed: " << f.get() << endl;
    }

    cout << "\n--- Shutting down ---" << endl;

    return 0; // Destructor calls shutdown()
}

Output:

Plaintext
ThreadPool started with 3 workers

--- Submitting tasks ---

--- Collecting results ---
1^2 = 1
2^2 = 4
3^2 = 9
4^2 = 16
5^2 = 25
6^2 = 36
Reversed: olleh
Reversed: dlrow

--- Shutting down ---
Worker 0 exiting
Worker 1 exiting
Worker 2 exiting
ThreadPool shut down

Step-by-step explanation:

  1. The thread pool creates numThreads worker threads in its constructor. Each runs workerLoop(), which waits on cv_ for tasks to appear in taskQueue_.
  2. submit() wraps the callable in a std::packaged_task, which connects a callable to a std::future for result retrieval. The task is pushed to the queue and cv_.notify_one() wakes one worker.
  3. Workers execute tasks outside the mutex lock — the lock is only held briefly to dequeue the task. This is critical: if tasks were executed while holding the lock, no other worker could dequeue tasks concurrently.
  4. shutdown() sets stopped_ = true and calls cv_.notify_all(). Workers check stopped_ && taskQueue_.empty() — if true, they exit. Tasks already in the queue are still processed (note: stopped_ being true doesn’t mean the queue is empty).
  5. future.get() in the caller blocks until the result is available. Multiple futures can be awaited concurrently — each worker independently produces its result, and get() retrieves them in any order you choose.

Condition Variables with std::stop_token (C++20)

C++20’s std::condition_variable_any works with std::stop_token to enable clean cancellation of waiting threads without polling or shared flags.

C++
#include <iostream>
#include <thread>
#include <stop_token>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;

mutex mtx;
condition_variable_any cv;  // condition_variable_any works with stop_token
queue<int> workQueue;

void worker(stop_token stopToken, int id) {
    while (true) {
        unique_lock<mutex> lock(mtx);

        // wait() overload that checks stop_token — exits immediately on stop request
        bool hasWork = cv.wait(lock, stopToken, []() {
            return !workQueue.empty();
        });

        if (!hasWork) {
            // stop was requested and queue is empty (or just stop was requested)
            cout << "Worker " << id << ": stop requested, exiting" << endl;
            return;
        }

        int item = workQueue.front();
        workQueue.pop();
        lock.unlock();

        cout << "Worker " << id << " processed: " << item << endl;
        this_thread::sleep_for(chrono::milliseconds(50));
    }
}

int main() {
    jthread w1(worker, 1);  // jthread provides automatic stop_token
    jthread w2(worker, 2);

    // Push some work
    for (int i = 1; i <= 6; i++) {
        {
            lock_guard<mutex> lock(mtx);
            workQueue.push(i);
        }
        cv.notify_one();
        this_thread::sleep_for(chrono::milliseconds(30));
    }

    // Give workers time to process
    this_thread::sleep_for(chrono::milliseconds(400));

    cout << "\nRequesting stop..." << endl;
    w1.request_stop();  // Signals stop_token — cv.wait returns false immediately
    w2.request_stop();

    // jthread destructor auto-joins
    return 0;
}

Output:

Plaintext
Worker 1 processed: 1
Worker 2 processed: 2
Worker 1 processed: 3
Worker 2 processed: 4
Worker 1 processed: 5
Worker 2 processed: 6

Requesting stop...
Worker 1: stop requested, exiting
Worker 2: stop requested, exiting

Step-by-step explanation:

  1. condition_variable_any (vs. condition_variable) can work with any lockable type, including jthread‘s internal stop state. It adds a wait(lock, stop_token, predicate) overload.
  2. When stop is requested (via request_stop() or jthread destructor), the wait(lock, stopToken, predicate) overload wakes the thread even if the predicate is false, and returns false. The worker checks the return value and exits cleanly.
  3. This eliminates the need for a separate closed_ flag, making the shutdown mechanism cleaner and integrating directly with C++20’s cooperative cancellation model.
  4. jthread automatically passes the stop token as the first argument to the thread function (when the callable accepts stop_token as the first parameter) and calls request_stop() in its destructor before joining.

Common Mistakes

Mistake 1: Using wait() without a predicate.

C++
cv.wait(lock);  // WRONG — spurious wakeups will cause bugs
cv.wait(lock, []() { return conditionIsTrue; }); // CORRECT

Mistake 2: Notifying before the waiter is sleeping.

C++
// WRONG order: notify before the waiter starts waiting — lost wakeup
cv.notify_one();   // No one is waiting yet
// ... then elsewhere: cv.wait(lock, pred); // Might miss the notification
// Fix: always ensure the predicate captures state, not just a signal
// The predicate will be checked immediately before waiting — if already true, no sleep

Mistake 3: Using lock_guard with wait().

C++
lock_guard<mutex> lock(mtx);
cv.wait(lock, pred);  // COMPILE ERROR: lock_guard has no unlock()
unique_lock<mutex> lock(mtx);  // CORRECT
cv.wait(lock, pred);

Mistake 4: Calling notify while holding the lock (minor issue).

C++
// Technically correct but may cause unnecessary context switches:
{
    lock_guard<mutex> lock(mtx);
    sharedData = newValue;
    cv.notify_one(); // Notifies while holding lock — woken thread immediately blocks
}
// Slightly better:
{
    lock_guard<mutex> lock(mtx);
    sharedData = newValue;
}
cv.notify_one(); // Notifies after releasing lock — woken thread can proceed immediately

Mistake 5: Using notify_one when multiple threads can proceed.

C++
// Added 10 items to queue but only notify_one — 9 consumers stay asleep
for (int i = 0; i < 10; i++) queue.push(i);
cv.notify_one();   // WRONG: only one consumer wakes, 9 items wait
cv.notify_all();   // CORRECT: all consumers can wake and process
// Or notify once per item if items are pushed in a loop

Condition Variable Quick Reference

MethodDescription
cv.wait(lock, pred)Sleep until pred() returns true; re-checks on every wakeup
cv.wait(lock)Sleep until notified; always spurious-wakeup prone — use pred
cv.wait_for(lock, dur, pred)Sleep until pred true or duration expires; returns bool
cv.wait_until(lock, tp, pred)Sleep until pred true or time_point reached; returns bool
cv.notify_one()Wake one waiting thread
cv.notify_all()Wake all waiting threads
condition_variable_anyLike cv but works with any lockable + stop_token

Conclusion

Condition variables are the bridge between mutex-based data protection and inter-thread coordination. They solve the fundamental problem of efficient waiting: instead of burning CPU cycles polling for a condition or sleeping for fixed intervals with unpredictable latency, threads sleep with zero CPU overhead and are woken precisely when the condition they need becomes true.

The three-part pattern — shared state protected by a mutex, a condition variable for efficient waiting, and a predicate that captures the exact condition being waited for — is the foundation of every coordination pattern in multithreaded C++: producer-consumer queues, thread pools, pipeline stages, barrier synchronization, and event-driven systems.

The rules are simple but absolute: always use unique_lock with wait(), always use a predicate to guard against spurious wakeups, choose notify_one() when exactly one waiter can proceed and notify_all() when multiple can, and always update the shared state before calling notify.

C++20 extends this model with condition_variable_any and stop_token integration, making cooperative cancellation clean and idiomatic. Combined with jthread‘s automatic stop management, modern C++ makes thread coordination both powerful and safe.

With mutexes (from the previous article) and condition variables (from this one), you have the complete toolkit for building any concurrent C++ system — from simple protected counters to sophisticated work-stealing thread pools.

Share:
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments

Discover More

Introduction to Flutter Widgets: Stateless and Stateful Widgets

Learn about Flutter’s Stateless and Stateful widgets, animations, and custom widget creation to build dynamic,…

Freelancing as a Data Scientist: Getting Started

Freelancing as a Data Scientist: Getting Started

Start your data science freelancing career with this complete guide. Learn how to find clients,…

Introduction to Machine Learning

Learn the fundamentals of machine learning from essential algorithms to evaluation metrics and workflow optimization.…

Skild AI Secures Record $1.4 Billion Funding Round

Pittsburgh robotics startup Skild AI secures $1.4 billion led by SoftBank, tripling valuation to $14…

Cable Management in Robotics: Preventing Tangles and Breaks

Cable Management in Robotics: Preventing Tangles and Breaks

Master cable management in robotics—learn routing, strain relief, connector selection, and wire labeling techniques to…

Data Science Page is Live

Discover the Power of Data: Introducing Data Science Category!

Click For More
0
Would love your thoughts, please comment.x
()
x