Table of Contents

In this post, we’ll have fun using C++20’s spans to process data on multiple threads. What’s more, we’ll be equipped with the latest concurrency features from C++20.

This text was motivated by the following comment under my recent article on std::span:

But why does this article… not show the major use case? A span is non-owning, so passing it to a function it can work on a subset of data owned by someone else! You can have a vector of data, and let two threads process two halves of it! Brilliant! By VictorEijkhout

Starting slow with std::thread  

Our task is relatively simple but can be extended to various multi-phase computations.

We need to initialize a container with numbers, filter them, then build a histogram.

Here’s the overview of the process:

Here’s a slow start with std::thread

constexpr int DATA_SIZE = 1000;
constexpr int MAX_NUM = 256;
std::vector<int> filteredNumbers;
std::mutex filteredNumbersMutex;

void initializeNumbers(std::span<int> numbers) {
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> distrib(0, MAX_NUM);

    for (auto& num : numbers)
        num = distrib(gen);
}

void filterNumbers(std::span<int> numbers) {
    std::vector<int> local;
    std::ranges::copy_if(numbers, std::back_inserter(local),
        [](int v) {return v % 2 == 0; });

    std::lock_guard<std::mutex> lock(filteredNumbersMutex);
    filteredNumbers.insert(filteredNumbers.end(), local.begin(), local.end());
}

void analyzeNumbers(std::span<int> numbers, std::map<int, int>& outHistogram) {
    for (int num : numbers)
        outHistogram[num]++;
}

Let’s break down the code:

  1. The code initiates the process by generating a series of random numbers. This is done in the initializeNumbers function, which takes a std::span<int> as its argument. This span points to a portion of the main data vector, allowing the function to work independently on either the first or second half of the vector. Inside the function, a random number generator is used to populate the elements of the span with random integers. This demonstrates how std::span enables efficient and safe operations on container slices without explicitly copying data or managing pointers.
  2. After initializing the numbers, the next phase is to filter them based on a specific criterion, which, in this case, is selecting even numbers. The filterNumbers function again operates on a span of the main vector. It uses the std::ranges::copy_if algorithm to select numbers that meet the criterion (even numbers) and copies them into a local vector. This local vector is then merged into a shared filteredNumbers vector, with thread safety ensured by a mutex. This step illustrates how spans can be used in conjunction with standard algorithms and thread-safe practices to process and manipulate subsets of data in parallel.
  3. Finally, the code builds a histogram from the filtered numbers. This is accomplished in the analyzeNumbers function, which takes a span of the filtered numbers and a reference to a histogram map. Each thread processes a portion of the filtered numbers, counting the occurrences of each number and updating its local histogram map. After processing, these local histograms are merged into a final histogram in the main thread. This step highlights how spans can facilitate parallel data processing, with each thread working on its own segment and efficiently combining results for the final output.

Here’s the final main that also performs the coordination of the threads:

int main() {
    std::vector<int> numbers(DATA_SIZE);

    std::span<int> firstHalf(numbers.data(), numbers.size()/2);
    std::span<int> secondHalf(numbers.data() + numbers.size()/2, numbers.size()/2);

    // Initialization phase
    std::thread initThread1(initializeNumbers, firstHalf);
    std::thread initThread2(initializeNumbers, secondHalf);
    initThread1.join();
    initThread2.join();

    // Filtering phase
    std::thread filterThread1(filterNumbers, firstHalf);
    std::thread filterThread2(filterNumbers, secondHalf);
    filterThread1.join();
    filterThread2.join();

    // Analysis phase
    std::array<std::map<int, int>, 2> localHistograms;
    std::span<int> filteredHalf(filteredNumbers.data(), filteredNumbers.size() / 2);
    std::span<int> filteredSecondHalf(filteredNumbers.data() + filteredNumbers.size()/2, filteredNumbers.size()/2);
    std::thread analyzeThread1(analyzeNumbers, filteredHalf, std::ref(localHistograms[0]));
    std::thread analyzeThread2(analyzeNumbers, filteredSecondHalf, std::ref(localHistograms[1]));
    analyzeThread1.join();
    analyzeThread2.join();

    std::map<int, int> finalHistogram;
    for (auto&& hist : localHistograms)
        for (const auto& [num, count] : hist)
            finalHistogram[num] += count;

    // Output the final histogram
    for (const auto& [num, count] : finalHistogram)
        std::cout << std::format("{:>3}: {}\n", num, std::string(count, '*'));
}

Run it at Compiler Explorer

Spans simplify data handling in concurrent programming. Instead of making smaller data copies, managing full container access, or using iterators—which can be inefficient, complex, or verbose—std::span offers a lightweight, non-owning view of the data. This approach allows for clear and efficient segment passing of a container to different threads, enhancing both the readability and maintainability of the code."

Adding std::jthread  

One of the simplest ways to add some C++20 is to use std::jthread. This new type of thread object ensures that a thread is joined when the owner object goes out of the scope. This automatic resource management feature of std::jthread streamlines code and reduces the risk of concurrency-related bugs. Beyond this, std::jthread offers additional functionalities, such as accepting stop tokens for more controlled thread interruption, which we’ll delve into in a future discussion.

Our example already calls .join() in well-defined places, so we have to recreate this behavior using jthreads:

Writing

    // Initialization phase
    std::jthread initThread1(initializeNumbers, allSpan.first(DATA_SIZE/2));
    std::jthread initThread2(initializeNumbers, allSpan.last(DATA_SIZE/2));

    // Filtering phase
    std::jthread filterThread1(filterNumbers, allSpan.first(DATA_SIZE / 2));
    std::jthread filterThread2(filterNumbers, allSpan.last(DATA_SIZE / 2));

It is not the best idea… as the initThread1 won’t finish before filterThread1 might start! So we have to add braces:

// Initialization phase
{
    std::jthread initThread1(initializeNumbers, allSpan.first(DATA_SIZE/2));
    std::jthread initThread2(initializeNumbers, allSpan.last(DATA_SIZE/2));
}

// Filtering phase
{
    std::jthread filterThread1(filterNumbers, allSpan.first(DATA_SIZE / 2));
    std::jthread filterThread2(filterNumbers, allSpan.last(DATA_SIZE / 2));
}

Now the braces create separate scopes for each thread and when the scope ends all created threads will join and complete their jobs.

Experiment here @Compiler Explorer

Barriers  

We can now try a different approach. Instead of creating six threads, each responsible for a separate phase of the data processing workflow, why not streamline the process using fewer threads?

This is where we introduce the concept of worker threads that process data in steps, using a powerful C++20 feature called std::barrier. By adopting this strategy, we can effectively manage the workflow with just two threads, reducing the overhead associated with thread management and context switching.

std::barrier is a synchronization primitive that enables multiple threads to wait for each other to reach a certain point in their execution before any of them can proceed. This feature is handy in scenarios where different phases of a task need to be executed in a coordinated manner across multiple threads. In our case, we use std::barrier to synchronize the transition between different data processing phases.

Here’s how it works: After completing each phase (e.g., initialization, filtering, and histogram analysis), each worker thread calls phaseBarrier.arrive_and_wait(). This function call indicates that the thread has arrived at the barrier and must wait until all other participating threads have also reached this point. Once all threads have arrived, the barrier is ‘broken’, and all threads are allowed to proceed to the next phase of processing. This ensures that each phase of processing is completed by all threads before any of them move on to the next phase.

This mechanism significantly simplifies the coordination between threads. By using std::barrier, we avoid the complexity of manually managing multiple threads for each phase and eliminate the need for intricate signaling or locking mechanisms. It leads to cleaner, more maintainable, and efficient multi-threaded code, making it an ideal choice for our multi-phase data processing task."

Here’s the code for the processing function:

void processNumbers(std::span<int> numbers, std::barrier<>& phaseBarrier, std::map<int, int>& outHistogram) {
    // Phase 1: Initialize Numbers
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> distrib(0, MAX_NUM);

    for (auto& num : numbers) {
        num = distrib(gen);
    }
    // Signal completion of phase 1 and wait
    phaseBarrier.arrive_and_wait();

    // Phase 2: Additional processing...
    std::vector<int> localFiltered;
    std::ranges::copy_if(numbers, std::back_inserter(localFiltered),
        [](int v) {return v % 2 == 0; });

    for (int num : localFiltered)
        outHistogram[num]++;
}

And the main function:

int main() {
    std::vector<int> numbers(DATA_SIZE);

    std::barrier phaseBarrier(2);

    std::span<int> firstHalf(numbers.data(), numbers.size()/2);
    std::span<int> secondHalf(numbers.data() + numbers.size()/2, numbers.size()/2);

    std::array<std::map<int, int>, 2> localHistograms;
    {
    std::jthread workerThread1(processNumbers, firstHalf, std::ref(phaseBarrier), 
                               std::ref(localHistograms[0]));
    std::jthread workerThread2(processNumbers, secondHalf, std::ref(phaseBarrier), 
                               std::ref(localHistograms[1]));
    }

    std::map<int, int> finalHistogram;
    for (auto&& hist : localHistograms)
        for (const auto& [num, count] : hist)
            finalHistogram[num] += count;

    // Output the final histogram
    for (const auto& [num, count] : finalHistogram)
        std::cout << std::format("{:>3}: {}\n", num, std::string(count, '*'));
}

Run @Compiler Explorer

We use barrier once: it’s initialized with the count of the worker threads, and then after we initialize the numbers, we wait for other workers’ threads. Once it’s done, we can move to the next phase of filtering and generating local histograms.

Latches  

While we’ve focused on std::barrier for our multi-phase data processing, another concurrency feature introduced in C++20 is std::latch. A latch is a synchronization primitive but functions differently from a barrier. It acts as a single-use barrier with a countdown mechanism. A std::latch is initialized with a counter of type std::ptrdiff_t, representing the number of ‘arrivals’ needed to release all waiting threads.

Threads can signal their arrival at a latch by calling count_down(), which decrements the latch’s counter. When the counter reaches zero, all threads waiting on the latch (using the wait() method) are released and can continue execution. Unlike std::barrier, which can be reused after all threads have arrived, a latch is a one-time-use object. Once its counter reaches zero, it can’t be reset or reused.

In our case std::latch might be even more suitable and simpler than barrier.

void processNumbers(std::span<int> numbers, std::latch& phaseLatch, std::map<int, int>& outHistogram) {
    // Phase 1: Initialize Numbers
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> distrib(0, MAX_NUM);

    for (auto& num : numbers) {
        num = distrib(gen);
    }
    // Signal completion of phase 1 and wait
    phaseLatch.arrive_and_wait();

    // Phase 2: Additional processing...
    std::vector<int> localFiltered;
    std::ranges::copy_if(numbers, std::back_inserter(localFiltered),
        [](int v) {return v % 2 == 0; });

    for (int num : localFiltered)
        outHistogram[num]++;
}

int main() {
    std::vector<int> numbers(DATA_SIZE);

    std::latch phaseLatch(2);

    std::span<int> firstHalf(numbers.data(), numbers.size()/2);
    std::span<int> secondHalf(numbers.data() + numbers.size()/2, numbers.size()/2);

    std::array<std::map<int, int>, 2> localHistograms;
    {
    std::jthread workerThread1(processNumbers, firstHalf, std::ref(phaseLatch), 
                               std::ref(localHistograms[0]));
    std::jthread workerThread2(processNumbers, secondHalf, std::ref(phaseLatch), 
                               std::ref(localHistograms[1]));
    }

    std::map<int, int> finalHistogram;
    for (auto&& hist : localHistograms)
        for (const auto& [num, count] : hist)
            finalHistogram[num] += count;

    // Output the final histogram
    for (const auto& [num, count] : finalHistogram)
        std::cout << std::format("{:>3}: {}\n", num, std::string(count, '*'));
}

Run @Compiler Explorer

It’s just a start  

Ok, it was fun!

We went through a series of steps and created a simple processing application: it initializes numbers, and later, we used threads to process that data. We explored jthreads that automatically join, and then we throw some barriers and latches to coordinate execution phases between threads.

But that’s just a start. The examples used only two threads, but why now check std::thread::hardware_concurrency() and create as many worker threads as your system has CPUs available? Then, we had only random numbers, but we could read data from files or networks. I’ll leave that to you to experiment.

What’s more, C++20 doesn’t just add jthread, barrier and latch, here’s a list of more cool changes related to concurrency in that C++ version:

  • std::atomic_ref: Allows atomic operations on non-atomic objects.
  • std::atomic<std::shared_ptr<T>> and std::atomic<std::weak_ptr<T>>: Specializations for atomic operations on shared and weak pointers.
  • std::counting_semaphore and std::binary_semaphore: New semaphore types for controlling access to shared resources.
  • std::stop_token, std::stop_source, and std::stop_callback: Mechanisms for cooperative thread interruption.

See more @CppReference

Back to you

  • Have you used C++20 multithreading/concurrency additions?
  • What do you use to process data using worker threads?
  • What else would you mprove in my code?

Join the discussion below in the comments.