一文详解Java闭锁和栅栏的实现
1. 什么是闭锁和栅栏
在并发编程中,有时需要等待某个操作的完成,或者协调多个线程的执行。Java提供了闭锁(Latch)和栅栏(Barrier)两个机制来实现这种协调。
闭锁是一种同步工具,可以使线程等待一个或多个线程的操作完成。闭锁一般会在某个线程等待另一个线程完成任务时使用。
栅栏是一种同步工具,它允许一组线程在某个点上进行相互等待,直到所有线程都到达此点,然后再一起继续执行。栅栏通常用于协调多个线程相互等待,等到所有线程都准备好后再统一执行。
2. 闭锁的实现
Java中闭锁的实现主要有两种方式:CountDownLatch和FutureTask。
2.1 CountDownLatch
CountDownLatch是一个计数器,初始值可以指定,当一个线程完成自己的任务后,会调用countDown()方法,计数器的值会减一。其他线程在等待该操作完成时,通过await()方法来等待,当计数器为0时,所有的线程就会被唤醒,然后继续执行。
下面是一个简单的CountDownLatch示例:
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
new Worker("Worker1", latch).start();
new Worker("Worker2", latch).start();
new Worker("Worker3", latch).start();
latch.await();
System.out.println("All workers have finished.");
}
static class Worker extends Thread {
private CountDownLatch latch;
public Worker(String name, CountDownLatch latch) {
super(name);
this.latch = latch;
}
@Override
public void run() {
System.out.println(getName() + " is working");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(getName() + " has finished");
latch.countDown();
}
}
}
运行结果:
Worker1 is working
Worker2 is working
Worker3 is working
Worker1 has finished
Worker2 has finished
Worker3 has finished
All workers have finished.
2.2 FutureTask
FutureTask也可以用来实现闭锁的功能。它可以安排一个Callable任务在FutureTask构造时执行,然后返回一个Future对象。可以在程序的任何时候调用Future的get()方法获取结果,获取结果时如果任务还没有执行完成,当前线程就会被阻塞。
下面是一个简单的FutureTask示例:
import java.util.concurrent.*;
public class FutureTaskExample {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(1);
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("Task is running");
Thread.sleep(1000);
System.out.println("Task has finished");
return "Done";
}
});
executor.execute(futureTask);
System.out.println("Waiting for result");
String result = futureTask.get();
System.out.println("Got result: " + result);
executor.shutdown();
}
}
运行结果:
Task is running
Waiting for result
Task has finished
Got result: Done
3. 栅栏的实现
Java中栅栏的实现有两种方式:CyclicBarrier和Exchanger。
3.1 CyclicBarrier
CyclicBarrier也是一个计数器,但它与CountDownLatch不同的是,CyclicBarrier要等所有线程都完成工作后才会继续执行。CyclicBarrier的计数器可以重复使用,也就是说,当所有线程都到达栅栏后,计数器又会重新恢复初始值。
下面是一个简单的CyclicBarrier示例:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample {
public static void main(String[] args) {
final int count = 3;
CyclicBarrier barrier = new CyclicBarrier(count, new Runnable() {
@Override
public void run() {
System.out.println("All threads have reached the barrier.");
}
});
new Worker("Worker1", barrier).start();
new Worker("Worker2", barrier).start();
new Worker("Worker3", barrier).start();
}
static class Worker extends Thread {
private CyclicBarrier barrier;
public Worker(String name, CyclicBarrier barrier) {
super(name);
this.barrier = barrier;
}
@Override
public void run() {
System.out.println(getName() + " is working");
try {
Thread.sleep((int) (Math.random() * 3000));
System.out.println(getName() + " has reached the barrier");
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
运行结果:
Worker1 is working
Worker2 is working
Worker3 is working
Worker1 has reached the barrier
Worker3 has reached the barrier
Worker2 has reached the barrier
All threads have reached the barrier.
3.2 Exchanger
Exchanger也可以用来实现栅栏的功能。Exchanger的主要作用是使两个线程可以互相交换数据。当线程A调用Exchanger的exchange()方法时,它会阻塞等待另一个线程B调用exchange()方法,然后交换数据。
下面是一个简单的Exchanger示例:
import java.util.concurrent.Exchanger;
public class ExchangerExample {
public static void main(String[] args) throws InterruptedException {
final Exchanger<String> exchanger = new Exchanger<>();
new Worker("Worker1", exchanger).start();
new Worker("Worker2", exchanger).start();
}
static class Worker extends Thread {
private Exchanger<String> exchanger;
public Worker(String name, Exchanger<String> exchanger) {
super(name);
this.exchanger = exchanger;
}
@Override
public void run() {
System.out.println(getName() + " is working");
try {
Thread.sleep((int) (Math.random() * 3000));
System.out.println(getName() + " is exchanging data with " + Thread.currentThread().getName());
String data = exchanger.exchange("Hello from " + getName());
System.out.println(getName() + " has received data: " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行结果:
Worker1 is working
Worker2 is working
Worker1 is exchanging data with Worker2
Worker2 is exchanging data with Worker1
Worker1 has received data: Hello from Worker2
Worker2 has received data: Hello from Worker1
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:一文详解Java闭锁和栅栏的实现 - Python技术站