源码分析Java中ThreadPoolExecutor的底层原理

yizhihongxing

源码分析Java中ThreadPoolExecutor的底层原理

1. 简介

ThreadPoolExecutor是Java提供的一个线程池的实现类,利用它可以实现线程池的管理、控制和优化。该类实现了ExecutorServiceAbstractExecutorService接口,是实现线程池的关键。

本篇文章将对ThreadPoolExecutor进行源码分析,深入了解线程池的底层原理。

2. 使用示例

首先,我们来看一下ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // ...
}

在使用ThreadPoolExecutor时,我们需要指定以下七个参数:

  • corePoolSize: 线程池中核心线程数的最大值。
  • maximumPoolSize: 线程池中最大线程数的最大值。
  • keepAliveTime: 非核心线程闲置时超时的时间。
  • unit: 上述时间的时间单位。
  • workQueue: 用于存储任务队列的BlockingQueue。
  • threadFactory: 用于创建线程的ThreadFactory。
  • handler: 表示当BlockingQueue中的任务数量超出线程池处理能力时的处理程序。

下面是一个使用示例,我们创建一个ThreadPoolExecutor对象,它的核心线程数为5,最大线程数为10,任务队列的容量为20,超时时间为60秒:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        5, 10, 60, TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(20),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy());

3. 底层实现原理

3.1 线程池工作原理

线程池的工作流程如下:

  1. 当有任务提交到线程池时,线程池会判断当前核心线程是否已满。如果未满,则创建一个新线程来处理这个任务。
  2. 当核心线程池已满时,任务会被存储在工作队列中。
  3. 当工作队列已满时,线程池会创建新线程来处理任务,直到线程池中线程数量达到最大线程数。
  4. 如果线程池中线程数量已达到最大值,而队列中的任务又无法处理,则线程池会执行拒绝策略。

3.2 线程池核心类分析

ThreadPoolExecutor包含了线程池执行的核心代码,它实现了ExecutorExecutorService接口。主要包括以下类:

  • Worker类:线程池的执行线程,是线程池的真正执行者。
  • Worker对象:维护着ThreadPoolExecutor的线程池、任务队列、线程池状态等信息。
  • ThreadPoolExecutor核心方法:主要包括提交任务、线程池操作等方法。

下面,我们来逐一解读这些类和方法。

3.3 Worker

Worker类是线程池的执行线程,它会不断地获取队列中的任务进行执行。

ThreadPoolExecutor中,Worker的定义如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    // ...
}

Worker继承自AbstractQueuedSynchronizer类,同时实现了Runnable接口,意味着它可以在一个锁中执行任务。

Worker的核心方法是runWorker,它是ThreadPoolExecutor中的最重要方法之一。以下是runWorker的代码:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                    && !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

其中,getTask方法是用于从任务队列中获取任务的方法,它工作原理如下:

  1. 如果线程池状态大于等于STOP,则线程需要退出处理循环。在退出前,需要调用rsignal方法唤醒任何可能在getTask方法中等待的线程。
  2. 尝试从队列中poll任务。
  3. 如果一直从队列中取任务失败,返回null。

3.4 ThreadPoolExecutor核心方法

ThreadPoolExecutor类主要包括以下核心方法:

  • execute方法:用于向线程池提交任务的方法。
  • addWorker方法:用于创建Worker线程。
  • getTask方法:用于从工作队列中获取任务的方法。
  • runWorker方法:用于执行任务的方法。
  • tearDown方法:用于终止线程池。

我们简述几个重要方法的工作原理。

3.4.1 execute方法

以下是execute方法的代码实现:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

它的工作流程如下:

  1. 接收一个Runnable类型的任务并检查该任务是否为null。
  2. 通过ctl变量检查线程池状态,核心线程数是否更小与核心线程数最大值,若满足,就添加一个Thread。
  3. 若步骤2中未成功添加线程,则尝试将任务加入队列中。
  4. 若步骤3中队列满了,则创建一个线程进行任务处理。
  5. 如果所有尝试添加任务到队列或启动新的线程(步骤3和步骤4)都失败,则执行拒绝策略。

3.4.2 addWorker方法

以下是addWorker方法的代码实现:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                  firstTask == null &&
                  !workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);   // 添加新的Worker到线程池中
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker方法是向线程池中添加一个新线程的方法。如果仍未达到线程池最大值,则可以继续添加新的线程。

3.5 示例说明

下面我们通过两个示例来说明线程池ThreadPoolExecutor的使用和原理。

示例1

在该示例中,我们使用ThreadPoolExecutor来实现一段计算密集型的任务。该任务将检查给定范围内的所有数字,计算是否为素数,并打印出所有素数。

import java.util.concurrent.*;

public class PrimeFinder implements Runnable {
    private final int start;
    private final int end;
    private final BlockingQueue<Integer> primes;
    private final ExecutorService executor;

    public PrimeFinder(int start, int end) {
        this.start = start;
        this.end = end;
        primes = new ArrayBlockingQueue<Integer>(10);
        executor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void run() {
        for (int i = start; i <= end; i++) {
            if (isPrime(i)) {
                primes.add(i);
            }
        }
    }

    public void primeNumbers() throws InterruptedException {
        int numThreads = 4;
        for (int i = 0; i < numThreads; i++) {
            executor.execute(this);
        }

        int numPrimes = 0;
        while (numPrimes < (end - start + 1)) {
            int prime = primes.take();
            System.out.println(prime);
            numPrimes++;
        }
        executor.shutdown();
    }

    private boolean isPrime(int n) {
        if (n <= 1)
            return false;
        else if (n == 2)
            return true;
        else if (n % 2 == 0)
            return false;
        else {
            for (int i = 3; i <= Math.sqrt(n); i += 2) {
                if (n % i == 0) {
                    return false;
                }
            }
            return true;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PrimeFinder primeFinder = new PrimeFinder(1, 1000);
        primeFinder.primeNumbers();
    }
}

在上述代码中,我们使用ThreadPoolExecutorThread来实现多线程执行,将任务分为4个并行线程来计算,加快计算的速度。

示例2

在该示例中,我们使用ThreadPoolExecutor来下载一批网络资源。代码如下:

import java.io.*;
import java.net.URL;
import java.util.concurrent.*;

public class Download implements Runnable {
    private final URL url;
    private final String filename;
    private final ExecutorService executor;

    public Download(URL url, String filename) {
        this.url = url;
        this.filename = filename;
        executor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void run() {
        try(InputStream stream = url.openStream()) {
            int read;
            try(FileOutputStream fos = new FileOutputStream(new File(filename))) {
                while ((read = stream.read()) != -1) {
                    fos.write(read);
                }
                fos.flush();
            }
        } catch(IOException e) {
            e.printStackTrace();
        }
    }

    public void download() throws InterruptedException {
        executor.execute(this);
    }

    public static void main(String[] args) throws InterruptedException {
        Download download1 = new Download(new URL("http://www.example.com/a"), "a.html");
        Download download2 = new Download(new URL("http://www.example.com/b"), "b.html");
        Download download3 = new Download(new URL("http://www.example.com/c"), "c.html");
        Download download4 = new Download(new URL("http://www.example.com/d"), "d.html");
        Download download5 = new Download(new URL("http://www.example.com/e"), "e.html");

        download1.download();
        download2.download();
        download3.download();
        download4.download();
        download5.download();

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.shutdown();
    }
}

在上述代码中,我们使用ThreadPoolExecutorThread来实现多线程下载,加快了五个资源文件的下载速度。

4. 总结

本文从实例出发,深入浅出地介绍了Java中线程池的底层原理,并详细地分析了ThreadPoolExecutor这一核心类的源代码。通过本文的学习,我们可以更好地理解线程池的工作原理,以及如何使用线程池提高程序的效率。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:源码分析Java中ThreadPoolExecutor的底层原理 - Python技术站

(0)
上一篇 2023年6月27日
下一篇 2023年6月27日

相关文章

  • sql语句把字段中的某个字符去掉

    SQL语句把字段中的某个字符去掉 在实际的开发中,我们经常会遇到需要修改某个字段的情况,如将电话号码的”-“去掉,将时间格式中的”-“替换成”/”等等。而这些操作,可以通过SQL语句来完成,本篇文章就介绍一下在SQL中如何去除字段中的某个字符。 REPLACE函数 SQL中的REPLACE函数可以实现替换操作,语法如下: REPLACE(str,old,ne…

    其他 2023年3月28日
    00
  • 魔兽世界8.0奇袭贼怎么输出 奇袭贼输出手法循环及优先级

    魔兽世界中的奇袭贼是一种非常强力的输出职业,在8.0版本中也有不少优化和调整。下面将详细讲解奇袭贼的输出手法循环及优先级,希望对各位战斗爱好者有所帮助。 奇袭贼输出手法循环 奇袭贼的输出手法主要包括以下几个步骤: 附魔毒药:首先,奇袭贼需要使用附魔毒药来提高主手武器的毒药效果。附魔毒药具体使用时需要注意技能CD和毒药时间,保持毒药效果的持续时间和刷新时间。 …

    other 2023年6月27日
    00
  • C# WPF开源UI控件库MaterialDesign介绍

    C# WPF开源UI控件库MaterialDesign介绍 MaterialDesign是一个基于Google Material Design风格的开源UI控件库,支持C#和WPF框架。它提供了一系列高质量的UI控件和组件,能够帮助快速搭建出美观且具有交互性的应用程序界面。 MaterialDesign的介绍 MaterialDesign是一个免费的开源项目…

    other 2023年6月26日
    00
  • sqlserver高级特性–存储过程

    以下是详细讲解“SQL Server高级特性–存储过程”的完整攻略,过程中至少包含两条示例说明的标准Markdown格式文本: SQL Server高级特性–存储过程 存储过程是SQL Server中的一种高级特性,它可以将一组SQL语句封装在一个可复用的单元中。本文将介绍如何创建和使用存储过程。 创建存储过程 在SQL Server中,可以使用CREA…

    other 2023年5月10日
    00
  • Windows 系统上 Adobe CEF Helper 高 CPU 占用/使用率的解决方案

    下面是详细讲解“Windows 系统上 Adobe CEF Helper 高 CPU 占用/使用率的解决方案”的完整攻略。 问题描述 在 Windows 系统中,当使用 Adobe 软件时,可能会出现 Adobe CEF Helper 高 CPU 占用/使用率的情况,这会导致电脑变得非常卡顿,影响工作效率。 解决方案 采取以下方法可以解决这个问题。 方法一:…

    other 2023年6月26日
    00
  • js封装tab标签页实例分享

    让我们开始讲解“js封装tab标签页实例分享”的完整攻略。 什么是Tab标签页? Tab标签页是常见的一种页面展示方式,通常用于多个页面之间进行切换,实现单页应用程序(SPA)或多标签应用程序,可以让用户直观地浏览内容。 如何使用js封装Tab标签页? 以下是一些关键步骤来创建一个可复用的Tab标签页组件: 第一步:HTML 结构 我们需要先在HTML中定义…

    other 2023年6月25日
    00
  • windows下使用vscode搭建golang环境并调试的过程

    下面就给大家介绍一下windows下使用vscode搭建golang环境并调试的过程的完整攻略。 环境搭建 安装Golang 首先,我们需要在官网(https://golang.org/dl/)下载golang的安装包并进行安装。安装完成后,可以在命令行中输入go version,若成功打印出版本号,则说明安装成功。 安装VSCode 接着,我们需要在官网(…

    other 2023年6月27日
    00
  • SQL Server Bulk Insert 只需要部分字段时的方法

    一、前言 在使用 SQL Server 进行批量数据导入时,如果只要导入部分字段而不是整个表的所有字段,该怎么实现呢?本文将详细讲解 SQL Server 的 Bulk Insert 只导入部分字段的方法,以及给出两个示例说明。 二、Bulk Insert 只导入部分字段的方法 在使用 SQL Server 的 Bulk Insert 命令进行数据导入时,通…

    other 2023年6月25日
    00
合作推广
合作推广
分享本页
返回顶部