源码分析Java中ThreadPoolExecutor的底层原理
1. 简介
ThreadPoolExecutor
是Java提供的一个线程池的实现类,利用它可以实现线程池的管理、控制和优化。该类实现了ExecutorService
和AbstractExecutorService
接口,是实现线程池的关键。
本篇文章将对ThreadPoolExecutor
进行源码分析,深入了解线程池的底层原理。
2. 使用示例
首先,我们来看一下ThreadPoolExecutor
的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
// ...
}
在使用ThreadPoolExecutor
时,我们需要指定以下七个参数:
corePoolSize
: 线程池中核心线程数的最大值。maximumPoolSize
: 线程池中最大线程数的最大值。keepAliveTime
: 非核心线程闲置时超时的时间。unit
: 上述时间的时间单位。workQueue
: 用于存储任务队列的BlockingQueue。threadFactory
: 用于创建线程的ThreadFactory。handler
: 表示当BlockingQueue中的任务数量超出线程池处理能力时的处理程序。
下面是一个使用示例,我们创建一个ThreadPoolExecutor
对象,它的核心线程数为5,最大线程数为10,任务队列的容量为20,超时时间为60秒:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(20),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
3. 底层实现原理
3.1 线程池工作原理
线程池的工作流程如下:
- 当有任务提交到线程池时,线程池会判断当前核心线程是否已满。如果未满,则创建一个新线程来处理这个任务。
- 当核心线程池已满时,任务会被存储在工作队列中。
- 当工作队列已满时,线程池会创建新线程来处理任务,直到线程池中线程数量达到最大线程数。
- 如果线程池中线程数量已达到最大值,而队列中的任务又无法处理,则线程池会执行拒绝策略。
3.2 线程池核心类分析
ThreadPoolExecutor
包含了线程池执行的核心代码,它实现了Executor
和ExecutorService
接口。主要包括以下类:
Worker
类:线程池的执行线程,是线程池的真正执行者。Worker
对象:维护着ThreadPoolExecutor的线程池、任务队列、线程池状态等信息。ThreadPoolExecutor
核心方法:主要包括提交任务、线程池操作等方法。
下面,我们来逐一解读这些类和方法。
3.3 Worker
类
Worker
类是线程池的执行线程,它会不断地获取队列中的任务进行执行。
在ThreadPoolExecutor
中,Worker
的定义如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// ...
}
Worker
继承自AbstractQueuedSynchronizer
类,同时实现了Runnable
接口,意味着它可以在一个锁中执行任务。
Worker
的核心方法是runWorker
,它是ThreadPoolExecutor
中的最重要方法之一。以下是runWorker
的代码:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
其中,getTask
方法是用于从任务队列中获取任务的方法,它工作原理如下:
- 如果线程池状态大于等于
STOP
,则线程需要退出处理循环。在退出前,需要调用rsignal
方法唤醒任何可能在getTask
方法中等待的线程。 - 尝试从队列中poll任务。
- 如果一直从队列中取任务失败,返回null。
3.4 ThreadPoolExecutor
核心方法
ThreadPoolExecutor
类主要包括以下核心方法:
execute
方法:用于向线程池提交任务的方法。addWorker
方法:用于创建Worker线程。getTask
方法:用于从工作队列中获取任务的方法。runWorker
方法:用于执行任务的方法。tearDown
方法:用于终止线程池。
我们简述几个重要方法的工作原理。
3.4.1 execute
方法
以下是execute
方法的代码实现:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
它的工作流程如下:
- 接收一个Runnable类型的任务并检查该任务是否为null。
- 通过
ctl
变量检查线程池状态,核心线程数是否更小与核心线程数最大值,若满足,就添加一个Thread。 - 若步骤2中未成功添加线程,则尝试将任务加入队列中。
- 若步骤3中队列满了,则创建一个线程进行任务处理。
- 如果所有尝试添加任务到队列或启动新的线程(步骤3和步骤4)都失败,则执行拒绝策略。
3.4.2 addWorker
方法
以下是addWorker
方法的代码实现:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // 添加新的Worker到线程池中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker
方法是向线程池中添加一个新线程的方法。如果仍未达到线程池最大值,则可以继续添加新的线程。
3.5 示例说明
下面我们通过两个示例来说明线程池ThreadPoolExecutor
的使用和原理。
示例1
在该示例中,我们使用ThreadPoolExecutor
来实现一段计算密集型的任务。该任务将检查给定范围内的所有数字,计算是否为素数,并打印出所有素数。
import java.util.concurrent.*;
public class PrimeFinder implements Runnable {
private final int start;
private final int end;
private final BlockingQueue<Integer> primes;
private final ExecutorService executor;
public PrimeFinder(int start, int end) {
this.start = start;
this.end = end;
primes = new ArrayBlockingQueue<Integer>(10);
executor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20), new ThreadPoolExecutor.CallerRunsPolicy());
}
public void run() {
for (int i = start; i <= end; i++) {
if (isPrime(i)) {
primes.add(i);
}
}
}
public void primeNumbers() throws InterruptedException {
int numThreads = 4;
for (int i = 0; i < numThreads; i++) {
executor.execute(this);
}
int numPrimes = 0;
while (numPrimes < (end - start + 1)) {
int prime = primes.take();
System.out.println(prime);
numPrimes++;
}
executor.shutdown();
}
private boolean isPrime(int n) {
if (n <= 1)
return false;
else if (n == 2)
return true;
else if (n % 2 == 0)
return false;
else {
for (int i = 3; i <= Math.sqrt(n); i += 2) {
if (n % i == 0) {
return false;
}
}
return true;
}
}
public static void main(String[] args) throws InterruptedException {
PrimeFinder primeFinder = new PrimeFinder(1, 1000);
primeFinder.primeNumbers();
}
}
在上述代码中,我们使用ThreadPoolExecutor
和Thread
来实现多线程执行,将任务分为4个并行线程来计算,加快计算的速度。
示例2
在该示例中,我们使用ThreadPoolExecutor
来下载一批网络资源。代码如下:
import java.io.*;
import java.net.URL;
import java.util.concurrent.*;
public class Download implements Runnable {
private final URL url;
private final String filename;
private final ExecutorService executor;
public Download(URL url, String filename) {
this.url = url;
this.filename = filename;
executor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20), new ThreadPoolExecutor.CallerRunsPolicy());
}
public void run() {
try(InputStream stream = url.openStream()) {
int read;
try(FileOutputStream fos = new FileOutputStream(new File(filename))) {
while ((read = stream.read()) != -1) {
fos.write(read);
}
fos.flush();
}
} catch(IOException e) {
e.printStackTrace();
}
}
public void download() throws InterruptedException {
executor.execute(this);
}
public static void main(String[] args) throws InterruptedException {
Download download1 = new Download(new URL("http://www.example.com/a"), "a.html");
Download download2 = new Download(new URL("http://www.example.com/b"), "b.html");
Download download3 = new Download(new URL("http://www.example.com/c"), "c.html");
Download download4 = new Download(new URL("http://www.example.com/d"), "d.html");
Download download5 = new Download(new URL("http://www.example.com/e"), "e.html");
download1.download();
download2.download();
download3.download();
download4.download();
download5.download();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.shutdown();
}
}
在上述代码中,我们使用ThreadPoolExecutor
和Thread
来实现多线程下载,加快了五个资源文件的下载速度。
4. 总结
本文从实例出发,深入浅出地介绍了Java中线程池的底层原理,并详细地分析了ThreadPoolExecutor
这一核心类的源代码。通过本文的学习,我们可以更好地理解线程池的工作原理,以及如何使用线程池提高程序的效率。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:源码分析Java中ThreadPoolExecutor的底层原理 - Python技术站