详解Java8 CompletableFuture的并行处理用法
前言
CompletableFuture
是 Java 8 中新增的一个非常强大的异步编程工具。它提供了非常完善的异步编程配套方案,让 Java 开发人员能够在不使用传统的回调编程方式的前提下,编写出高效、可读、可维护的异步代码。
CompletableFuture
的强大体现在它不仅仅支持异步编程,而且还提供了非常强大的并行处理能力。在本文中,我们将详细讲解 CompletableFuture
的并行处理用法,包括如何构建、运行并行处理任务,如何处理任务结果,以及如何编写优雅的失败处理机制。
构建并行任务
要使用 CompletableFuture
进行并行处理,我们需要先创建一个 CompletableFuture
实例,然后将任务提交到这个实例中。我们可以使用 CompletableFuture.supplyAsync
、CompletableFuture.runAsync
、CompletableFuture.completedFuture
等静态工厂方法来创建一个 CompletableFuture
实例,其中 supplyAsync
和 runAsync
方法都支持并行处理,而 completedFuture
方法则是创建一个已经完成的 CompletableFuture
实例,通常用于构建常量结果的 CompletableFuture
。
我们可以使用下面的代码创建一个 CompletableFuture
实例,并将一个简单的计算任务提交到这个实例中:
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
// 计算 1 到 10 之间所有数的平均值
double sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
return sum / 10;
});
在这个代码中,我们使用 supplyAsync
静态工厂方法创建了一个 CompletableFuture<Double>
实例,并将一个计算平均值的任务提交到这个实例中。由于这个任务是 CPU 密集型任务,我们可以使用 supplyAsync
方法将其提交到线程池中进行并行计算。
与传统的线程池不同的是,CompletableFuture
内置了一个默认的线程池,这个线程池的线程数量取决于 CPU 核心数。如果我们需要自定义线程池,可以使用 supplyAsync
或 runAsync
方法的重载版本,将自定义线程池对象作为参数传递进去。
运行并行任务
一旦我们创建了一个 CompletableFuture
实例,并将任务提交到这个实例中,我们就需要运行这个任务,并等待任务的完成。在 CompletableFuture
中,运行任务通常有两种方式:阻塞等待和回调函数。
阻塞等待
阻塞等待是最简单的一种运行任务的方式。我们可以使用 CompletableFuture
本身提供的 get
方法来阻塞等待任务的完成,返回任务的结果。如果任务在等待期间被取消或者出现异常,get
方法会抛出异常。代码如下:
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
// 计算 1 到 10 之间所有数的平均值
double sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
return sum / 10;
});
try {
Double result = future.get();
System.out.println("计算结果为:" + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
在这个代码中,我们使用 get
方法等待任务的完成,并在任务完成后打印任务的计算结果。请注意,get
方法会阻塞当前线程,直到任务完成。
回调函数
回调函数是 CompletableFuture
中更为常用的一种运行任务的方式。通过回调函数,我们可以在任务完成后异步地处理任务的结果,而不是阻塞等待任务完成。在 CompletableFuture
中,回调函数有两种方式:thenAccept
和 thenApply
。
thenAccept
方法接受一个 ConsumerthenAccept
方法,在任务完成后打印任务的结果:
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
// 计算 1 到 10 之间所有数的平均值
double sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
return sum / 10;
});
future.thenAccept(result -> System.out.println("计算结果为:" + result));
thenApply
方法接受一个 FunctionthenApply
方法,在任务完成后计算任务的结果的平方值:
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
// 计算 1 到 10 之间所有数的平均值
double sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
return sum / 10;
});
future.thenApply(result -> result * result)
.thenAccept(square -> System.out.println("计算结果的平方为:" + square));
处理任务结果
在 CompletableFuture
中,我们可以使用 CompletableFuture.thenApply
、CompletableFuture.thenAccept
、CompletableFuture.exceptionally
等方法来处理任务的结果。前面我们已经介绍了 thenApply
和 thenAccept
方法,这里我们主要讲解一下 exceptionally
方法。
exceptionally
方法用于在任务出现异常时处理异常,返回一个默认值。我们可以使用下面的代码注册一个 exceptionally
方法,在任务出现除数为 0 的异常时返回一个默认值:
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
// 计算 1 到 10 之间所有数的平均值
double sum = 0;
for (int i = 1; i <= 10; i++) {
sum += i;
}
// 注意:这里除数为 0,会抛出异常
return sum / 0;
});
future.exceptionally(e -> {
System.out.println("出现异常:" + e.getMessage());
return -1.0;
}).thenAccept(result -> System.out.println("计算结果为:" + result));
在这个代码中,我们故意将计算平均值的代码设置为除数为 0 的计算,这样会抛出异常。我们使用 exceptionally
方法对异常进行处理,并返回一个默认值。这里的默认值是 -1.0。
示例说明
我们通过两个示例来说明 CompletableFuture
的并行处理用法。
示例一:同时计算多个任务的结果
在这个示例中,我们需要同时计算多个任务的结果,并将所有结果进行合并。每个任务都是一个 CPU 密集型的计算任务,可以使用并行处理来提高计算效率。
我们可以使用 CompletableFuture.allOf
静态方法来并行处理多个任务。CompletableFuture.allOf
方法接受一个 CompletableFuture<?>...
类型的参数,用于同时执行多个 CompletableFuture
实例,并返回一个 CompletableFuture<Void>
类型的实例,表示所有任务都完成后的状态。我们可以通过循环获取每个任务的结果,并将结果进行合并。示例代码如下:
List<CompletableFuture<Double>> futures = new ArrayList<>(10);
for (int i = 1; i <= 10; i++) {
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> {
// 计算 1 到 10 之间的每个数的平方
return Math.pow(i, 2);
});
futures.add(future);
}
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allFutures.thenRun(() -> {
double sum = 0;
for (CompletableFuture<Double> future : futures) {
try {
sum += future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
System.out.println("计算结果为:" + sum);
});
在这个代码中,我们构建了 10 个计算平方的任务,将每个任务的结果添加到一个列表中。然后,我们使用 CompletableFuture.allOf
方法并行处理这些任务,并在所有任务完成后计算所有结果的总和,并将计算结果打印出来。
示例二:同时访问多个 Web 服务
在这个示例中,我们需要同时访问多个 Web 服务,将所有服务返回的结果进行合并。每个服务的响应时间都不同,但我们需要在所有服务都返回结果后才能开始合并。
我们可以使用 CompletableFuture.anyOf
静态方法来并行处理多个 Web 服务。CompletableFuture.anyOf
方法接受一个 CompletableFuture<?>...
类型的参数,用于同时执行多个操作,并返回一个 CompletableFuture<Object>
类型的实例,表示其中任何一个任务完成后的状态。我们可以通过 CompletableFuture.join
方法等待所有任务完成,并将任务结果进行合并。示例代码如下:
List<CompletableFuture<String>> futures = new ArrayList<>(10);
futures.add(sendRequest("https://www.example.com/1"));
futures.add(sendRequest("https://www.example.com/2"));
futures.add(sendRequest("https://www.example.com/3"));
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0]));
Object result = anyFuture.join();
System.out.println("最先返回的结果为:" + result);
在这个代码中,我们并行访问了三个 Web 服务,然后使用 CompletableFuture.anyOf
方法并行处理这三个任务,直到其中一个任务返回才会返回。我们使用 CompletableFuture.join
方法等待所有任务都完成,并将任务结果进行合并。在这个示例中,我们只需要取其中一个结果进行处理,因此我们只打印了最先返回的结果。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:详解Java8 CompletableFuture的并行处理用法 - Python技术站