Java多线程之同步工具类CyclicBarrier
什么是CyclicBarrier
CyclicBarrier是java.util.concurrent包下的一个同步工具类。它能够使线程等待至指定数量的线程都达到某个状态后再一起执行。
CyclicBarrier就像一个障碍物,当每个线程到达这个障碍物时,就必须停下来等待其他线程也到达障碍物,当所有线程都到达后,障碍物才会打开,让所有线程继续执行下去。
CyclicBarrier的构造方法
CyclicBarrier有多个构造方法,其中最常用的一个有两个参数:
public CyclicBarrier(int parties, Runnable barrierAction) { ... }
parties
: 表示参与线程的数量barrierAction
: 当线程到达障碍物时,优先执行这个线程任务,可以为null
如果不传入barrierAction
参数,线程到达障碍物后就直接继续执行。
CyclicBarrier的使用
使用CyclicBarrier主要分为两个步骤:初始化和等待。
初始化CyclicBarrier
在使用CyclicBarrier时,我们需要先初始化一个CyclicBarrier对象。以下是初始化CyclicBarrier的示例:
int count = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(count);
以上代码创建了一个CyclicBarrier实例,该实例要求所有线程都到达3个障碍物,才能继续往下执行。
等待线程到达CyclicBarrier
在初始化完成后,所有线程都需要等待其他线程到达CyclicBarrier,当所有线程都到达时,CyclicBarrier将被打开,所有线程都可以继续往下执行。
Thread threadA = new Thread(() -> {
System.out.println("Thread A is waiting at the barrier.");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Thread A has passed the barrier.");
});
Thread threadB = new Thread(() -> {
System.out.println("Thread B is waiting at the barrier.");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Thread B has passed the barrier.");
});
Thread threadC = new Thread(() -> {
System.out.println("Thread C is waiting at the barrier.");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("Thread C has passed the barrier.");
});
threadA.start();
threadB.start();
threadC.start();
以上代码创建了3个线程,并同时将它们启动,CyclicBarrier将阻塞所有线程,直到所有线程都到达障碍物。当所有线程都到达障碍物后,CyclicBarrier将打开,所有线程都可以继续执行下面的代码。
示例说明
示例1:多线程计算数组和
让多个线程同时计算一个大数组的和,最后将所有结果相加。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CalculateArraySumDemo {
private static final int THREAD_COUNT = 5; // 线程数
private static final int ARRAY_LENGTH = 100; // 数组长度
public static void main(String[] args) {
int[] array = new int[ARRAY_LENGTH];
for (int i = 0; i < ARRAY_LENGTH; i++) {
array[i] = i + 1; // 初始化数组
}
CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_COUNT, () -> {
int sum = 0;
for (int i = 0; i < THREAD_COUNT; i++) {
sum += result[i]; // 将每个线程的结果相加
}
System.out.println("Array sum: " + sum);
});
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < THREAD_COUNT; i++) {
executor.execute(new CalculateTask(array, cyclicBarrier));
}
executor.shutdown();
}
static int[] result = new int[THREAD_COUNT];
static class CalculateTask implements Runnable {
private final int[] array;
private final CyclicBarrier cyclicBarrier;
public CalculateTask(int[] array, CyclicBarrier cyclicBarrier) {
this.array = array;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
int startIndex = result.length * (int) Thread.currentThread().getId();
int endIndex = startIndex + result.length;
for (int i = startIndex; i < endIndex; i++) {
result[(int) Thread.currentThread().getId()] += array[i];
}
// 等待所有计算子任务完成
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
以上代码中,我们使用一个CyclicBarrier来实现线程间的同步。当每个线程计算完它们要求计算的数据后,会调用cyclicBarrier.await()方法,表示当前线程已经完成了它的任务,并且要等待其他线程也完成任务。
当所有子线程都计算完后,CyclicBarrier将触发barrierAction回调,在其回调中,将所有线程的计算结果相加,得到数组和,并打印到控制台上。
示例2:模拟赛跑
模拟N个选手比赛,最后公布排名。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class RaceDemo {
private static final int THREAD_COUNT = 5; // 选手数量
private static final int ROUND_COUNT = 3; // 跑三圈
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(new Player(cyclicBarrier)).start();
}
}
static class Player implements Runnable {
private final CyclicBarrier cyclicBarrier;
public Player(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
int round = 0;
while (round < ROUND_COUNT) {
try {
System.out.println("Player " + Thread.currentThread().getId() + " has run " + (round + 1) + " round.");
Thread.sleep((long) (Math.random() * 5000)); // 跑步随机时间
cyclicBarrier.await(); // 到达终点,开始等待
round++;
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
System.out.println("Player " + Thread.currentThread().getId() + " has arrived at the finish line.");
}
}
}
以上代码中,我们创建了5个Player线程模拟赛跑,这些线程共享一个CyclicBarrier对象。当一名选手到达终点,他将调用CyclicBarrier
的await()
方法,表明自己已经到达终点,并等待其他选手也到达终点,当所有选手都到达终点后,CyclicBarrier将调用CyclicBarrier的barrierAction回调,公布比赛排名。
总结
CyclicBarrier是一种多线程同步工具,它可以使一组线程到达某个同步点后,等待所有线程都到达同步点后再一起继续执行。我们可以通过CyclicBarrier来编写高效的并发程序,利用多核计算机的计算能力来提高程序的性能和吞吐量。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java多线程之同步工具类CyclicBarrier - Python技术站