浅谈 C++ 如何实现并发中的 Barrier(屏障)。
什么是 Barrier
Barrier 是一种线程间的同步机制,用于在多个线程执行过程中,所有线程都执行到某一点后,才允许线程继续往下走。这样可以保证线程的执行顺序和结果的正确性。
常见的应用场景包括:并行计算(等待所有线程都计算完毕后合并结果)、多线程写入(等待所有线程都写入结束再合并文件)、游戏引擎中限制所有玩家必须等待约定的时间才开始游戏、高性能计算中同多个算法进行通信等。
具体实现上,当一个线程到达 Barrier 时,它将被阻塞,直至所有其它线程都到达 Barrier,然后同时被释放,继续执行后续任务。
C++ 如何实现 Barrier
C++ 中的屏障 Barrier 是由 C++11 标准引入的。它定义在头文件 <barrier>
中。在类 std::barrier
中,我们可以设置需要等待的屏障的总数,以及每个屏障的执行操作。该类的定义如下:
class barrier {
public:
explicit barrier(std::size_t num_threads) noexcept(noexcept(std::size_t(std::declval<num_threads>())));
template<class F> // F: void()
explicit barrier(std::size_t num_threads, F); // 指定每次屏障要执行的回调函数
barrier(const barrier &) = delete;
barrier(barrier &&) = delete;
barrier & operator=(const barrier &) = delete;
barrier & operator=(barrier &&) = delete;
~barrier();
void wait(); // 每个线程调用 wait() 进入屏障等待
};
其中:
- num_threads
表示需要等待的屏障的总数;
- F
表示每个屏障要执行的回调函数,是一个无参无返回值的函数对象。
在这个类中,我们可以使用 std::barrier
实现 C++ 中的 Barrier 。
下面是一个示例:在 C++20 中多线程排序中使用屏障。实现多个线程同时排序不同片段,等待排序结束后再合并结果。其中,屏障的大小与线程数量相等。
#include <algorithm>
#include <barrier>
#include <chrono>
#include <iostream>
#include <vector>
template<typename RandomIt>
void parallel_sort(RandomIt first, RandomIt last,
std::size_t num_threads = std::thread::hardware_concurrency()) {
auto sz = std::distance(first, last);
if (sz <= 1) return;
std::vector<std::thread> threads(num_threads - 1);
std::vector<typename RandomIt::value_type> buffers[num_threads];
std::vector<typename RandomIt::value_type> *buf[2] = {&buffers[0], &buffers[1]};
std::size_t front = 0, back = 1;
auto per_thread = sz / num_threads;
auto left_over = sz % num_threads;
std::vector<std::barrier> barriers(num_threads, num_threads);
auto thread_func = [&](std::size_t id) {
auto begin = first + id * per_thread;
auto end = begin + per_thread;
if (id == num_threads - 1) end = last;
std::sort(begin, end);
auto local_size = std::distance(begin, end);
buf[front] = &buffers[id];
buf[back] = &buffers[(id + 1) % num_threads];
buf[front]->resize(local_size);
std::move(begin, end, buf[front]->begin());
std::size_t level = 0;
std::size_t level_size = 2;
for (std::size_t i = 0; i < barriers.size() - 1; ++i) {
level_size <<= 1;
if ((id & level) == 0 && ((id | level_size) < barriers.size())) {
barriers[id].wait();
std::inplace_merge(buf[front]->begin(), buf[front]->end(),
buf[back]->begin(), buf[back]->end());
std::swap(buf[front], buf[back]);
}
level_size -= (level += 1);
}
if (id != 0) barriers[id].wait();
std::move(buf[front]->begin(), buf[front]->end(), begin);
};
for (std::size_t i = 0; i < num_threads - 1; ++i) {
threads[i] = std::thread(thread_func, i);
}
thread_func(num_threads - 1);
for (std::size_t i = 0; i < num_threads - 1; ++i) {
threads[i].join();
}
buf[front] = &buffers[0];
buf[back] = &buffers[1];
for (std::size_t i = 1; i < num_threads; ++i) {
std::inplace_merge(first, first + per_thread * i,
first + per_thread * (i + 1), last);
buf[front]->resize(per_thread * (i + 1));
std::move(first, first + per_thread * (i + 1), buf[front]->begin());
std::swap(buf[front], buf[back]);
}
std::inplace_merge(first, first + per_thread * left_over,
last, last);
}
还可以实现一个简单的例子:在 C++17 中使用两个线程实现一个简单的屏障。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
class Barrier {
public:
explicit Barrier(std::size_t num_threads) : num_threads_{num_threads} {}
void wait() {
std::unique_lock<std::mutex> lock(mu_);
++num_arrived_;
if (num_arrived_ == num_threads_) {
num_arrived_ = 0;
cv_.notify_all();
} else {
cv_.wait(lock, [this] { return num_arrived_ == 0; });
}
}
private:
std::mutex mu_;
std::condition_variable cv_;
std::size_t num_threads_;
std::size_t num_arrived_ = 0;
};
int main() {
const std::size_t num_threads = 2;
Barrier barrier(num_threads);
std::thread t1([&] {
std::cout << "Thread 1 arrived." << std::endl;
barrier.wait();
std::cout << "Thread 1 continues." << std::endl;
});
std::thread t2([&] {
std::cout << "Thread 2 arrived." << std::endl;
barrier.wait();
std::cout << "Thread 2 continues." << std::endl;
});
t1.join();
t2.join();
return 0;
}
以上两个示例可以在 C++17 中执行。一个示例使用 C++20 中的屏障实现多线程排序,另一个使用 C++17 编写的一个简单的屏障,其中有两个线程。
总结
以上就是关于 C++ 中如何实现并发中的 Barrier 的详细讲解,包含了 std::barrier
的使用方法和两个示例。建议在实际项目中使用 std::barrier
实现并发 Barrier,这将提高并发程序的效率和稳定性。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:浅谈c++如何实现并发中的Barrier - Python技术站