Java并发编程之栅栏(CyclicBarrier)实例介绍
什么是栅栏(CyclicBarrier)?
栅栏(CyclicBarrier)是Java并发编程中的一种工具类,它可以在多个线程中实现同步。当栅栏的计数器(CyclicBarrier(int parties)构造函数中的参数)被减到0时,所有由该栅栏等待的线程才能继续执行。
栅栏的使用方法
在使用栅栏之前,需要先创建一个栅栏。
CyclicBarrier barrier = new CyclicBarrier(int parties);
参数parties
是栅栏需要等待的线程数量。
然后在多个线程中调用栅栏的await
方法。
try {
barrier.await();
} catch (InterruptedException ex) {
// handle exception
} catch (BrokenBarrierException ex) {
// handle exception
}
当线程调用await
方法时,它会被阻塞,直到所有栅栏等待的线程(即parties
数量)都调用了await
方法。然后所有等待的线程都会被同时唤醒,并继续执行。
栅栏示例说明一
在下面的示例中,我们创建了3个线程(即parties
的数量为3),每个线程都会打印出一个数字。等到所有线程都打印完数字后,再打印一条语句。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class BarrierExample1 {
public static void main(String[] args) {
int parties = 3;
CyclicBarrier barrier = new CyclicBarrier(parties, new Runnable() {
@Override
public void run() {
System.out.println("All threads have finished printing!");
}
});
for (int i = 0; i < parties; i++) {
new Thread(new PrintNumberTask(i, barrier)).start();
}
}
static class PrintNumberTask implements Runnable {
private int number;
private CyclicBarrier barrier;
public PrintNumberTask(int number, CyclicBarrier barrier) {
this.number = number;
this.barrier = barrier;
}
@Override
public void run() {
try {
// 打印数字
System.out.println("Thread " + number + " prints " + number);
// 等待其他线程打印数字
barrier.await();
} catch (InterruptedException ex) {
// handle exception
} catch (BrokenBarrierException ex) {
// handle exception
}
}
}
}
运行结果:
Thread 0 prints 0
Thread 2 prints 2
Thread 1 prints 1
All threads have finished printing!
可以看到,三个线程的打印顺序不一定是按照线程号顺序的,但是最终都会在所有线程都打印完数字后,打印出一条语句。
栅栏示例说明二
下面我们再来看一个更实际的例子。假设我们有一个大文件需要处理,我们可以把这个文件分割成多个小文件,每个线程处理一个小文件,处理完毕后再把结果合并起来。使用栅栏可以很方便地实现这个过程。
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class BarrierExample2 {
public static void main(String[] args) {
int nThreads = 3;
String filename = "big_file.txt";
// 将文件分割为n个子文件
List<File> files = splitFile(new File(filename), nThreads);
CyclicBarrier barrier = new CyclicBarrier(nThreads, new Runnable() {
@Override
public void run() {
System.out.println("All threads have finished processing files!");
}
});
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < nThreads; i++) {
File file = files.get(i);
Thread thread = new Thread(new FileProcessingTask(file, barrier));
threads.add(thread);
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException ex) {
// handle exception
}
}
// 合并处理结果
// mergeResult(files);
}
static List<File> splitFile(File file, int n) {
List<File> files = new ArrayList<>();
// calculate the size of each chunk
long chunkSize = file.length() / n;
try (BufferedReader reader = new BufferedReader(new FileReader(file))) {
String line;
long bytesRead = 0;
int fileNumber = 0;
File outputFile = new File(file.getParent(), file.getName() + "." + fileNumber);
files.add(outputFile);
while ((line = reader.readLine()) != null) {
if (bytesRead + line.getBytes().length > chunkSize) {
// start a new file
outputFile = new File(file.getParent(), file.getName() + "." + (++fileNumber));
files.add(outputFile);
bytesRead = 0;
}
// write the line to the current file
// ...
bytesRead += line.getBytes().length;
}
} catch (IOException ex) {
// handle exception
}
return files;
}
static void mergeResult(List<File> files) {
// merge the files
// ...
}
static class FileProcessingTask implements Runnable {
private File file;
private CyclicBarrier barrier;
public FileProcessingTask(File file, CyclicBarrier barrier) {
this.file = file;
this.barrier = barrier;
}
@Override
public void run() {
// process the file
// ...
try {
// 等待其他线程处理完毕
barrier.await();
} catch (InterruptedException ex) {
// handle exception
} catch (BrokenBarrierException ex) {
// handle exception
}
}
}
}
在上面的代码中,我们先将大文件big_file.txt
分割成3个小文件,然后创建了3个线程,每个线程负责处理一个小文件。当每个线程处理完毕后,都会调用栅栏的await
方法,等待其他线程也处理完毕。当所有线程都处理完毕后,栅栏的Runnable
会被调用。
这里的示例还省略了文件处理和合并的具体代码,读者可以自行实现。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java并发编程之栅栏(CyclicBarrier)实例介绍 - Python技术站