Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解
介绍
本篇文章主要讲解Java并发编程中的三种常用同步工具:CountDownLatch、CyclicBarrier和Semaphore。这三种工具都可以用于协调线程的执行,但实现的方式有所不同。
- CountDownLatch:用于等待多个线程执行完毕后再执行某个操作。
- CyclicBarrier:用于等待多个线程到达某个屏障点后再同时执行某个操作。
- Semaphore:用于控制同时访问某个资源的线程数量。
CountDownLatch
CountDownLatch是一个同步工具类,用于等待多个线程完成一定的操作后再进行下一步操作。它的原理是维护一个计数器,初始化时需要指定计数器的值,在每个线程完成一定操作后,计数器的值减1,当计数器的值减到0时,表示所有线程已完成操作,等待的线程可以继续执行。
下面是一个简单的示例,我们使用CountDownLatch来等待两个线程执行完毕后计算它们的和:
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountDownLatchExample {
public static void main(String[] args) throws Exception {
int nThreads = 2;
CountDownLatch countDownLatch = new CountDownLatch(nThreads);
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
for (int i = 1; i <= nThreads; i++) {
executorService.submit(new Worker(i, countDownLatch));
}
countDownLatch.await();
System.out.println("All workers have finished the job");
}
static class Worker implements Runnable {
private int id;
private CountDownLatch countDownLatch;
public Worker(int id, CountDownLatch countDownLatch) {
this.id = id;
this.countDownLatch = countDownLatch;
}
public void run() {
System.out.printf("Worker %d starts to work\n", id);
try {
Thread.sleep(2000); // 模拟完成某些操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.printf("Worker %d has finished the job\n", id);
countDownLatch.countDown();
}
}
}
运行结果如下:
Worker 1 starts to work
Worker 2 starts to work
Worker 1 has finished the job
Worker 2 has finished the job
All workers have finished the job
CyclicBarrier
CyclicBarrier也是一个同步工具类,和CountDownLatch不同的是,它可以多次等待。CyclicBarrier能够等待多个线程到达同一个屏障点后再继续执行。它的原理是维护一个计数器和一个barrierAction,在每个线程完成一定操作后,计数器的值减1,当计数器的值减到0时,执行barrierAction,并重置计数器的值,等待的线程可以继续执行。
下面是一个简单的示例,我们使用CyclicBarrier来等待三个线程到达同一个屏障点后继续执行:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(parties, () -> {
System.out.println("All parties have arrived at the barrier, continue to execute");
});
new Worker(1, cyclicBarrier).start();
new Worker(2, cyclicBarrier).start();
new Worker(3, cyclicBarrier).start();
}
static class Worker extends Thread {
private int id;
private CyclicBarrier cyclicBarrier;
public Worker(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}
public void run() {
System.out.printf("Worker %d starts to work\n", id);
try {
Thread.sleep(2000); // 模拟完成某些操作
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.printf("Worker %d has arrived at the barrier\n", id);
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.printf("Worker %d continues to execute\n", id);
}
}
}
运行结果如下:
Worker 1 starts to work
Worker 2 starts to work
Worker 3 starts to work
Worker 1 has arrived at the barrier
Worker 2 has arrived at the barrier
Worker 3 has arrived at the barrier
All parties have arrived at the barrier, continue to execute
Worker 2 continues to execute
Worker 1 continues to execute
Worker 3 continues to execute
Semaphore
Semaphore是一个同步工具类,用于控制同时访问某个资源的线程数量。它的原理是维护一个计数器,初始化时需要指定计数器的值,线程想要访问该资源时需要获取许可,如果许可数量大于0,则可以获取许可并继续执行,许可数量减1;否则线程必须等待其他线程释放许可。
下面是一个简单的示例,我们使用Semaphore来控制同时访问某个资源的线程数量:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
public static void main(String[] args) {
int nThreads = 10;
Semaphore semaphore = new Semaphore(3);
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
for (int i = 0; i < nThreads; i++) {
executorService.submit(new Worker(i, semaphore));
}
}
static class Worker implements Runnable {
private int id;
private Semaphore semaphore;
public Worker(int id, Semaphore semaphore) {
this.id = id;
this.semaphore = semaphore;
}
public void run() {
try {
semaphore.acquire();
System.out.printf("Worker %d acquired the semaphore, start to work\n", id);
Thread.sleep(2000); // 模拟完成某些操作
semaphore.release();
System.out.printf("Worker %d released the semaphore\n", id);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
运行结果如下:
Worker 0 acquired the semaphore, start to work
Worker 1 acquired the semaphore, start to work
Worker 2 acquired the semaphore, start to work
Worker 3 is blocked by the semaphore
Worker 4 is blocked by the semaphore
Worker 7 is blocked by the semaphore
Worker 6 is blocked by the semaphore
Worker 5 is blocked by the semaphore
Worker 9 is blocked by the semaphore
Worker 8 is blocked by the semaphore
Worker 0 released the semaphore
Worker 2 released the semaphore
Worker 1 released the semaphore
Worker 7 acquired the semaphore, start to work
Worker 6 acquired the semaphore, start to work
Worker 9 acquired the semaphore, start to work
Worker 8 acquired the semaphore, start to work
Worker 4 acquired the semaphore, start to work
Worker 3 acquired the semaphore, start to work
Worker 5 acquired the semaphore, start to work
Worker 7 released the semaphore
Worker 9 released the semaphore
Worker 8 released the semaphore
Worker 6 released the semaphore
Worker 5 released the semaphore
Worker 4 released the semaphore
Worker 3 released the semaphore
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解 - Python技术站