浅谈c++如何实现并发中的Barrier

浅谈 C++ 如何实现并发中的 Barrier(屏障)。

什么是 Barrier

Barrier 是一种线程间的同步机制,用于在多个线程执行过程中,所有线程都执行到某一点后,才允许线程继续往下走。这样可以保证线程的执行顺序和结果的正确性。

常见的应用场景包括:并行计算(等待所有线程都计算完毕后合并结果)、多线程写入(等待所有线程都写入结束再合并文件)、游戏引擎中限制所有玩家必须等待约定的时间才开始游戏、高性能计算中同多个算法进行通信等。

具体实现上,当一个线程到达 Barrier 时,它将被阻塞,直至所有其它线程都到达 Barrier,然后同时被释放,继续执行后续任务。

C++ 如何实现 Barrier

C++ 中的屏障 Barrier 是由 C++11 标准引入的。它定义在头文件 <barrier> 中。在类 std::barrier 中,我们可以设置需要等待的屏障的总数,以及每个屏障的执行操作。该类的定义如下:

class barrier {
public:
    explicit barrier(std::size_t num_threads) noexcept(noexcept(std::size_t(std::declval<num_threads>())));

    template<class F> // F: void()
    explicit barrier(std::size_t num_threads, F);  // 指定每次屏障要执行的回调函数

    barrier(const barrier &) = delete;
    barrier(barrier &&) = delete;

    barrier & operator=(const barrier &) = delete;
    barrier & operator=(barrier &&) = delete;

    ~barrier();

    void wait(); // 每个线程调用 wait() 进入屏障等待
};

其中:
- num_threads 表示需要等待的屏障的总数;
- F 表示每个屏障要执行的回调函数,是一个无参无返回值的函数对象。

在这个类中,我们可以使用 std::barrier 实现 C++ 中的 Barrier 。

下面是一个示例:在 C++20 中多线程排序中使用屏障。实现多个线程同时排序不同片段,等待排序结束后再合并结果。其中,屏障的大小与线程数量相等。

#include <algorithm>
#include <barrier>
#include <chrono>
#include <iostream>
#include <vector>

template<typename RandomIt>
void parallel_sort(RandomIt first, RandomIt last,
                   std::size_t num_threads = std::thread::hardware_concurrency()) {
    auto sz = std::distance(first, last);
    if (sz <= 1) return;

    std::vector<std::thread> threads(num_threads - 1);

    std::vector<typename RandomIt::value_type> buffers[num_threads];
    std::vector<typename RandomIt::value_type> *buf[2] = {&buffers[0], &buffers[1]};
    std::size_t front = 0, back = 1;

    auto per_thread = sz / num_threads;
    auto left_over = sz % num_threads;

    std::vector<std::barrier> barriers(num_threads, num_threads);

    auto thread_func = [&](std::size_t id) {
        auto begin = first + id * per_thread;
        auto end = begin + per_thread;
        if (id == num_threads - 1) end = last;

        std::sort(begin, end);

        auto local_size = std::distance(begin, end);
        buf[front] = &buffers[id];
        buf[back] = &buffers[(id + 1) % num_threads];

        buf[front]->resize(local_size);

        std::move(begin, end, buf[front]->begin());

        std::size_t level = 0;
        std::size_t level_size = 2;

        for (std::size_t i = 0; i < barriers.size() - 1; ++i) {
            level_size <<= 1;
            if ((id & level) == 0 && ((id | level_size) < barriers.size())) {
                barriers[id].wait();
                std::inplace_merge(buf[front]->begin(), buf[front]->end(),
                                   buf[back]->begin(), buf[back]->end());
                std::swap(buf[front], buf[back]);
            }

            level_size -= (level += 1);
        }

        if (id != 0) barriers[id].wait();

        std::move(buf[front]->begin(), buf[front]->end(), begin);
    };

    for (std::size_t i = 0; i < num_threads - 1; ++i) {
        threads[i] = std::thread(thread_func, i);
    }

    thread_func(num_threads - 1);

    for (std::size_t i = 0; i < num_threads - 1; ++i) {
        threads[i].join();
    }

    buf[front] = &buffers[0];
    buf[back] = &buffers[1];

    for (std::size_t i = 1; i < num_threads; ++i) {
        std::inplace_merge(first, first + per_thread * i,
                           first + per_thread * (i + 1), last);
        buf[front]->resize(per_thread * (i + 1));
        std::move(first, first + per_thread * (i + 1), buf[front]->begin());
        std::swap(buf[front], buf[back]);
    }

    std::inplace_merge(first, first + per_thread * left_over,
                       last, last);
}

还可以实现一个简单的例子:在 C++17 中使用两个线程实现一个简单的屏障。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

class Barrier {
public:
    explicit Barrier(std::size_t num_threads) : num_threads_{num_threads} {}

    void wait() {
        std::unique_lock<std::mutex> lock(mu_);
        ++num_arrived_;
        if (num_arrived_ == num_threads_) {
            num_arrived_ = 0;
            cv_.notify_all();
        } else {
            cv_.wait(lock, [this] { return num_arrived_ == 0; });
        }
    }

private:
    std::mutex mu_;
    std::condition_variable cv_;
    std::size_t num_threads_;
    std::size_t num_arrived_ = 0;
};

int main() {
    const std::size_t num_threads = 2;
    Barrier barrier(num_threads);

    std::thread t1([&] {
        std::cout << "Thread 1 arrived." << std::endl;
        barrier.wait();
        std::cout << "Thread 1 continues." << std::endl;
    });

    std::thread t2([&] {
        std::cout << "Thread 2 arrived." << std::endl;
        barrier.wait();
        std::cout << "Thread 2 continues." << std::endl;
    });

    t1.join();
    t2.join();

    return 0;
}

以上两个示例可以在 C++17 中执行。一个示例使用 C++20 中的屏障实现多线程排序,另一个使用 C++17 编写的一个简单的屏障,其中有两个线程。

总结

以上就是关于 C++ 中如何实现并发中的 Barrier 的详细讲解,包含了 std::barrier 的使用方法和两个示例。建议在实际项目中使用 std::barrier 实现并发 Barrier,这将提高并发程序的效率和稳定性。

阅读剩余 74%

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:浅谈c++如何实现并发中的Barrier - Python技术站

(0)
上一篇 2023年5月16日
下一篇 2023年5月16日

相关文章

  • Python实现的HTTP并发测试完整示例

    这里是关于 “Python实现的HTTP并发测试完整示例” 的完整攻略。 前言 在对一个Web服务器进行压力测试时,一个重要的方面是能够模拟多个并发请求以测试其性能。在Python中,我们可以使用多种库来实现HTTP并发测试。本文将涵盖使用concurrent.futures和asyncio库实现HTTP并发测试的两个示例。 易于使用的concurrent.…

    多线程 2023年5月16日
    00
  • JAVA多线程中join()方法的使用方法

    JAVA多线程中join()方法的使用方法 什么是join()方法 在Java中,通过继承Thread类或实现Runnable接口来创建线程。当主线程想等待某个子线程执行完毕后再进行下一步动作时,可以使用join()方法。 join()方法的作用是:让当前线程等待调用join()方法的线程执行完毕。 join()方法的基本用法 join()方法的基本语法如下…

    多线程 2023年5月16日
    00
  • Linux多线程编程(一)

    Linux多线程编程(一) 前言 Linux是一个多线程的操作系统,可以支持多个并发执行的程序。多线程编程可以充分利用多核CPU,在并发执行的情况下提高程序的性能,同时也可以编写出体验更加流畅、响应更快的应用程序。 本文将介绍Linux多线程编程,并提供两个示例说明,分别演示线程的创建和同步。 线程创建 在Linux中,线程的创建依赖于pthread库,因此…

    多线程 2023年5月17日
    00
  • Java并发编程总结——慎用CAS详解

    Java并发编程总结——慎用CAS详解 什么是CAS? CAS是英文单词“Compare and Swap”的缩写,中文意思是比较并交换。是一种常见的并发编程技术,在Java并发编程中也得到了广泛的应用。 CAS技术主要通过保证内存操作的原子性以避免多线程之间的竞争和冲突。CAS操作的主要思路是先比较内存中的值是否与期望值相同,如果相同,则将新值写入内存;否…

    多线程 2023年5月17日
    00
  • DB2和 Oracle的并发控制(锁)的比较

    DB2和Oracle的并发控制(锁)的比较 什么是并发控制(锁)? 并发控制是指在多个用户同时对数据库进行读写操作时,确保这些操作能够顺利执行而不产生冲突的一种技术。一般来说,当多个用户同时对数据库进行读写时,会产生资源竞争和数据一致性问题,而锁技术可以帮助解决这些问题。 DB2与Oracle的并发控制锁机制 DB2的并发控制锁机制 DB2支持多种类型的锁,…

    多线程 2023年5月17日
    00
  • C# List 并发丢数据问题原因及解决方案

    C# List 并发丢数据问题原因及解决方案 问题描述 在多线程环境下,使用C#的List时,会存在添加元素丢失、重复、越界等问题,导致程序出现异常或不可预料的结果。这是由于List本身并不是线程安全的集合类,多个线程同时对其进行写操作时,会导致竞争条件,从而出现数据异常。 原因分析 List是一个基于数组的集合类型,当多个线程同时对其进行写操作时,可能会导…

    多线程 2023年5月17日
    00
  • Java多线程之Interrupt中断线程详解

    Java多线程之Interrupt中断线程详解 在使用Java进行多线程编程时,经常需要控制线程的执行行为,比如暂停、终止、恢复线程等。这时我们就需要一个中断机制来实现我们的控制需求。Java中,通过Interrupt中断机制来实现对线程的中断控制。 中断线程的基本使用方法: 要中断一个Java线程,可以使用线程对象的interrupt()方法,其语法为: …

    多线程 2023年5月17日
    00
  • Java使用5个线程计算数组之和

    针对“Java使用5个线程计算数组之和”这一需求,我可以提供如下的完整攻略: 1. 准备工作 首先,需要准备一个长整型类型的数组,用来保存需要进行求和计算的数据。可以使用如下代码来创建一个长度为1000的数组: long[] data = new long[1000]; // TODO:在这里添加数据到数组中 接着,可以创建5个线程来并行计算数组的求和。线程…

    多线程 2023年5月16日
    00
合作推广
合作推广
分享本页
返回顶部