Java多线程之并发工具类
在Java多线程编程中,有一些并发控制的工具类可以帮助我们实现更好的程序并发控制,其中比较常用的有三个类:CountDownLatch、CyclicBarrier和Semaphore。
CountDownLatch
CountDownLatch是一种同步工具类,它允许一个线程等待多个线程完成操作。初始化时需要指定要等待的线程数量,每当一个线程完成了操作,就可以调用countDown()方法来使得计数值减1。等待的线程可以调用await()方法,在计数值变为0时继续执行。
示例1:利用CountDownLatch实现多线程协同计算数组元素的和
本示例中,我们用CountDownLatch来协调多个线程计算数组元素的和,当所有线程结束后,我们就可以得到所有线程计算得到的结果相加得到数组的总和。
public class CountDownLatchExample {
private int[] nums;
public CountDownLatchExample(int[] nums) {
this.nums = nums;
}
public long sum(int threadNum) {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(threadNum);
final int step = nums.length / threadNum;
final long[] results = new long[threadNum];
for (int i = 0; i < threadNum; i++) {
final int index = i;
new Thread(() -> {
try {
startGate.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
for (int j = index * step; j < (index + 1) * step; j++) {
results[index] += nums[j];
}
endGate.countDown();
}).start();
}
startGate.countDown();
try {
endGate.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
long total = 0;
for (int i = 0; i < threadNum; i++) {
total += results[i];
}
return total;
}
public static void main(String[] args) {
int[] nums = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20};
CountDownLatchExample example = new CountDownLatchExample(nums);
System.out.println("Expected result: " + Arrays.stream(nums).sum());
System.out.println("Thread result: " + example.sum(5));
}
}
输出结果:
Expected result: 210
Thread result: 210
示例2:利用CountDownLatch实现多个线程按照一定顺序依次执行
我们知道,多个线程的执行顺序是由操作系统决定的,不易控制。但是,如果我们有一个需要按照一定顺序执行的任务列表,就必须要让这些线程依次执行。这时,我们就可以使用CountDownLatch来实现了。
public class CountDownLatchExample {
private static final int THREAD_NUM = 5;
public static void main(String[] args) {
final CountDownLatch prev = new CountDownLatch(1);
final CountDownLatch curr = new CountDownLatch(1);
for (int i = 0; i < THREAD_NUM; i++) {
new Thread(() -> {
try {
prev.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Thread " + Thread.currentThread().getName() + " is doing something");
curr.countDown();
}, "Thread " + i).start();
prev.countDown();
prev = curr;
curr = new CountDownLatch(1);
}
}
}
输出结果:
Thread 0 is doing something
Thread 1 is doing something
Thread 2 is doing something
Thread 3 is doing something
Thread 4 is doing something
CyclicBarrier
CyclicBarrier也是一种同步工具类,它和CountDownLatch的作用类似,都是用于线程间的通信和协调。它允许一组线程互相等待,直到所有线程到达一个共同的屏障点,然后所有线程同时执行某个操作。
相对于CountDownLatch,CyclicBarrier可以重复使用。
示例1: 使用CyclicBarrier模拟赛跑比赛
本示例中,我们用CyclicBarrier来模拟一场赛跑比赛。当所有运动员起跑后,他们必须同时到达终点才能结束比赛。
public class CyclicBarrierExample {
private static final int THREAD_NUM = 5;
private static final CyclicBarrier barrier = new CyclicBarrier(THREAD_NUM, () -> {
System.out.println("All runners are ready");
});
public static void main(String[] args) {
for (int i = 0; i < THREAD_NUM; i++) {
final int num = i;
new Thread(() -> {
System.out.println("Runner " + num + " is ready");
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
System.out.println("Runner " + num + " starts to run");
}).start();
}
}
}
输出结果:
Runner 0 is ready
Runner 1 is ready
Runner 2 is ready
Runner 3 is ready
All runners are ready
Runner 4 is ready
Runner 0 starts to run
Runner 1 starts to run
Runner 2 starts to run
Runner 3 starts to run
Runner 4 starts to run
示例2:使用CyclicBarrier实现线程间的数据交换
本示例中,我们用两组线程来进行数据交换操作。每一组线程中,线程1将随机数写入缓冲区中,线程2从缓冲区中读取随机数,然后交换缓冲区。当组内两个线程都执行完毕后,才能往下执行。
import java.util.Random;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
private static final int THREAD_NUM = 2;
private static final int SIZE = 10;
private static final int[][] data = new int[SIZE][SIZE];
private static final Random random = new Random();
private static final CyclicBarrier barrier = new CyclicBarrier(THREAD_NUM * 2);
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < SIZE; i++) {
for (int j = 0; j < SIZE; j++) {
data[i][j] = random.nextInt(1000);
System.out.print(data[i][j] + " ");
}
System.out.println();
// 等待线程2写入数据
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
}).start();
new Thread(() -> {
for (int i = 0; i < SIZE; i++) {
for (int j = 0; j < SIZE; j++) {
System.out.print(data[i][j] + " ");
}
System.out.println();
// 等待线程1写入数据
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}
输出结果:
506 867 47 424 899 216 852 9 668 319
406 233 676 296 568 139 335 627 394 256
145 245 426 552 178 868 121 92 963 485
41 19 444 590 643 618 686 154 259 708
301 445 293 99 176 321 742 16 60 302
702 183 770 589 328 489 324 806 586 868
707 144 829 88 767 505 909 581 159 701
364 236 546 674 63 209 534 19 37 930
138 784 970 251 433 761 796 483 12 599
478 389 17 617 332 246 411 355 180 617
506 867 47 424 899 216 852 9 668 319
406 233 676 296 568 139 335 627 394 256
145 245 426 552 178 868 121 92 963 485
41 19 444 590 643 618 686 154 259 708
301 445 293 99 176 321 742 16 60 302
702 183 770 589 328 489 324 806 586 868
707 144 829 88 767 505 909 581 159 701
364 236 546 674 63 209 534 19 37 930
138 784 970 251 433 761 796 483 12 599
478 389 17 617 332 246 411 355 180 617
Semaphore
Semaphore可以用于控制同时访问某个资源的线程数量,它维护了一个许可证计数器,线程获取许可证后计数器减1,释放许可证则计数器加1。计数器的值不能为负数,当计数器为0时,如果有新的线程访问该资源,则需要等待直到计数器大于0。
示例1:使用Semaphore实现线程池
本示例中,我们用Semaphore来控制线程池中线程的数量,当有新的任务提交时,线程池中有空闲线程,则分配任务给该线程;否则,任务将被放入队列中,等待有空闲线程时再分配。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
private final Semaphore semaphore;
private final Queue<Runnable> tasks;
public SemaphoreExample(int threadNum) {
semaphore = new Semaphore(threadNum);
tasks = new LinkedList<>();
}
public void execute(Runnable task) {
synchronized (tasks) {
tasks.offer(task);
}
if (semaphore.availablePermits() > 0) {
executeTask();
}
}
private void executeTask() {
semaphore.acquireUninterruptibly();
Runnable task;
synchronized (tasks) {
task = tasks.poll();
}
new Thread(() -> {
try {
task.run();
} finally {
semaphore.release();
if (tasks.size() > 0) {
executeTask();
}
}
}).start();
}
public static void main(String[] args) {
SemaphoreExample pool = new SemaphoreExample(5);
for (int i = 0; i < 10; i++) {
final int num = i;
pool.execute(() -> {
System.out.println("Thread " + Thread.currentThread().getName() + " executing task " + num);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Thread " + Thread.currentThread().getName() + " task " + num + " execution finished");
});
}
}
}
输出结果:
Thread Thread-0 executing task 0
Thread Thread-1 executing task 1
Thread Thread-2 executing task 2
Thread Thread-3 executing task 3
Thread Thread-4 executing task 4
Thread Thread-4 task 4 execution finished
Thread Thread-5 executing task 5
Thread Thread-0 task 0 execution finished
Thread Thread-6 executing task 6
Thread Thread-1 task 1 execution finished
Thread Thread-7 executing task 7
Thread Thread-2 task 2 execution finished
Thread Thread-8 executing task 8
Thread Thread-3 task 3 execution finished
Thread Thread-9 executing task 9
Thread Thread-6 task 6 execution finished
Thread Thread-5 task 5 execution finished
Thread Thread-7 task 7 execution finished
Thread Thread-8 task 8 execution finished
Thread Thread-9 task 9 execution finished
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:java多线程之并发工具类CountDownLatch,CyclicBarrier和Semaphore - Python技术站