源码分析Java中ThreadPoolExecutor的底层原理

源码分析Java中ThreadPoolExecutor的底层原理

1. 简介

ThreadPoolExecutor是Java提供的一个线程池的实现类,利用它可以实现线程池的管理、控制和优化。该类实现了ExecutorServiceAbstractExecutorService接口,是实现线程池的关键。

本篇文章将对ThreadPoolExecutor进行源码分析,深入了解线程池的底层原理。

2. 使用示例

首先,我们来看一下ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // ...
}

在使用ThreadPoolExecutor时,我们需要指定以下七个参数:

  • corePoolSize: 线程池中核心线程数的最大值。
  • maximumPoolSize: 线程池中最大线程数的最大值。
  • keepAliveTime: 非核心线程闲置时超时的时间。
  • unit: 上述时间的时间单位。
  • workQueue: 用于存储任务队列的BlockingQueue。
  • threadFactory: 用于创建线程的ThreadFactory。
  • handler: 表示当BlockingQueue中的任务数量超出线程池处理能力时的处理程序。

下面是一个使用示例,我们创建一个ThreadPoolExecutor对象,它的核心线程数为5,最大线程数为10,任务队列的容量为20,超时时间为60秒:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        5, 10, 60, TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(20),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy());

3. 底层实现原理

3.1 线程池工作原理

线程池的工作流程如下:

  1. 当有任务提交到线程池时,线程池会判断当前核心线程是否已满。如果未满,则创建一个新线程来处理这个任务。
  2. 当核心线程池已满时,任务会被存储在工作队列中。
  3. 当工作队列已满时,线程池会创建新线程来处理任务,直到线程池中线程数量达到最大线程数。
  4. 如果线程池中线程数量已达到最大值,而队列中的任务又无法处理,则线程池会执行拒绝策略。

3.2 线程池核心类分析

ThreadPoolExecutor包含了线程池执行的核心代码,它实现了ExecutorExecutorService接口。主要包括以下类:

  • Worker类:线程池的执行线程,是线程池的真正执行者。
  • Worker对象:维护着ThreadPoolExecutor的线程池、任务队列、线程池状态等信息。
  • ThreadPoolExecutor核心方法:主要包括提交任务、线程池操作等方法。

下面,我们来逐一解读这些类和方法。

3.3 Worker

Worker类是线程池的执行线程,它会不断地获取队列中的任务进行执行。

ThreadPoolExecutor中,Worker的定义如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    // ...
}

Worker继承自AbstractQueuedSynchronizer类,同时实现了Runnable接口,意味着它可以在一个锁中执行任务。

Worker的核心方法是runWorker,它是ThreadPoolExecutor中的最重要方法之一。以下是runWorker的代码:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                    && !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

其中,getTask方法是用于从任务队列中获取任务的方法,它工作原理如下:

  1. 如果线程池状态大于等于STOP,则线程需要退出处理循环。在退出前,需要调用rsignal方法唤醒任何可能在getTask方法中等待的线程。
  2. 尝试从队列中poll任务。
  3. 如果一直从队列中取任务失败,返回null。

3.4 ThreadPoolExecutor核心方法

ThreadPoolExecutor类主要包括以下核心方法:

  • execute方法:用于向线程池提交任务的方法。
  • addWorker方法:用于创建Worker线程。
  • getTask方法:用于从工作队列中获取任务的方法。
  • runWorker方法:用于执行任务的方法。
  • tearDown方法:用于终止线程池。

我们简述几个重要方法的工作原理。

3.4.1 execute方法

以下是execute方法的代码实现:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

它的工作流程如下:

  1. 接收一个Runnable类型的任务并检查该任务是否为null。
  2. 通过ctl变量检查线程池状态,核心线程数是否更小与核心线程数最大值,若满足,就添加一个Thread。
  3. 若步骤2中未成功添加线程,则尝试将任务加入队列中。
  4. 若步骤3中队列满了,则创建一个线程进行任务处理。
  5. 如果所有尝试添加任务到队列或启动新的线程(步骤3和步骤4)都失败,则执行拒绝策略。

3.4.2 addWorker方法

以下是addWorker方法的代码实现:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                  firstTask == null &&
                  !workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);   // 添加新的Worker到线程池中
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker方法是向线程池中添加一个新线程的方法。如果仍未达到线程池最大值,则可以继续添加新的线程。

3.5 示例说明

下面我们通过两个示例来说明线程池ThreadPoolExecutor的使用和原理。

示例1

在该示例中,我们使用ThreadPoolExecutor来实现一段计算密集型的任务。该任务将检查给定范围内的所有数字,计算是否为素数,并打印出所有素数。

import java.util.concurrent.*;

public class PrimeFinder implements Runnable {
    private final int start;
    private final int end;
    private final BlockingQueue<Integer> primes;
    private final ExecutorService executor;

    public PrimeFinder(int start, int end) {
        this.start = start;
        this.end = end;
        primes = new ArrayBlockingQueue<Integer>(10);
        executor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void run() {
        for (int i = start; i <= end; i++) {
            if (isPrime(i)) {
                primes.add(i);
            }
        }
    }

    public void primeNumbers() throws InterruptedException {
        int numThreads = 4;
        for (int i = 0; i < numThreads; i++) {
            executor.execute(this);
        }

        int numPrimes = 0;
        while (numPrimes < (end - start + 1)) {
            int prime = primes.take();
            System.out.println(prime);
            numPrimes++;
        }
        executor.shutdown();
    }

    private boolean isPrime(int n) {
        if (n <= 1)
            return false;
        else if (n == 2)
            return true;
        else if (n % 2 == 0)
            return false;
        else {
            for (int i = 3; i <= Math.sqrt(n); i += 2) {
                if (n % i == 0) {
                    return false;
                }
            }
            return true;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PrimeFinder primeFinder = new PrimeFinder(1, 1000);
        primeFinder.primeNumbers();
    }
}

在上述代码中,我们使用ThreadPoolExecutorThread来实现多线程执行,将任务分为4个并行线程来计算,加快计算的速度。

示例2

在该示例中,我们使用ThreadPoolExecutor来下载一批网络资源。代码如下:

import java.io.*;
import java.net.URL;
import java.util.concurrent.*;

public class Download implements Runnable {
    private final URL url;
    private final String filename;
    private final ExecutorService executor;

    public Download(URL url, String filename) {
        this.url = url;
        this.filename = filename;
        executor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void run() {
        try(InputStream stream = url.openStream()) {
            int read;
            try(FileOutputStream fos = new FileOutputStream(new File(filename))) {
                while ((read = stream.read()) != -1) {
                    fos.write(read);
                }
                fos.flush();
            }
        } catch(IOException e) {
            e.printStackTrace();
        }
    }

    public void download() throws InterruptedException {
        executor.execute(this);
    }

    public static void main(String[] args) throws InterruptedException {
        Download download1 = new Download(new URL("http://www.example.com/a"), "a.html");
        Download download2 = new Download(new URL("http://www.example.com/b"), "b.html");
        Download download3 = new Download(new URL("http://www.example.com/c"), "c.html");
        Download download4 = new Download(new URL("http://www.example.com/d"), "d.html");
        Download download5 = new Download(new URL("http://www.example.com/e"), "e.html");

        download1.download();
        download2.download();
        download3.download();
        download4.download();
        download5.download();

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.shutdown();
    }
}

在上述代码中,我们使用ThreadPoolExecutorThread来实现多线程下载,加快了五个资源文件的下载速度。

4. 总结

本文从实例出发,深入浅出地介绍了Java中线程池的底层原理,并详细地分析了ThreadPoolExecutor这一核心类的源代码。通过本文的学习,我们可以更好地理解线程池的工作原理,以及如何使用线程池提高程序的效率。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:源码分析Java中ThreadPoolExecutor的底层原理 - Python技术站

(0)
上一篇 2023年6月27日
下一篇 2023年6月27日

相关文章

  • yum和apt-get的区别详解

    yum和apt-get的区别详解 介绍 yum和apt-get都是常见的包管理工具,用于在Linux系统中安装、更新和卸载软件包。它们在不同的Linux发行版中使用,并具有一些区别。 yum yum是CentOS、Fedora和RHEL等Red Hat系列发行版中的包管理工具。它使用RPM包管理系统,可以从软件仓库中下载和安装软件包。 示例:使用yum安装n…

    other 2023年10月14日
    00
  • wp8.1通知中心怎么设置?wp8.1开发者预览版通知中心操作方法

    下面是WP8.1通知中心设置的完整攻略: 一、进入通知中心设置页面 要设置WP8.1的通知中心,首先需要进入设置页面,步骤如下: 在桌面上向下滑动打开通知操作中心 点击屏幕上方的“所有设置”按钮 滑动页面到底部,点击“通知+操作中心” 二、配置通知中心 进入“通知+操作中心”页面后,可以进行如下操作: 针对每个应用程序,选择它们在通知中心中显示的方式,包括是…

    other 2023年6月26日
    00
  • 使用latex画图系列

    以下是关于“使用LaTeX画图系列”的完整攻略,包括LaTeX画图的基本知识、使用TikZ和PGFPlots两种工画图的方法和两个示例等。 LaTeX画图的基本知识 LaTeX是一种排版系统,可以用于创建高质量的文档。在LaTeX中,可以使用TikZ和PGFPlots两种工具来画图。 TikZ TikZ是一种绘图工,可以用于创建各种类型的图形,包括流程图、网…

    other 2023年5月7日
    00
  • C++赋值函数+移动赋值函数+移动构造函数详解

    C++赋值函数+移动赋值函数+移动构造函数详解 前言 在 C++ 中,我们经常需要对对象进行赋值。同时,在使用数组等数据结构时,由于涉及大量的对象操作和对象间的拷贝,可能会导致性能问题。这时,我们可以通过使用移动构造函数和移动赋值函数来提高操作效率。 这篇文章将会详细介绍 C++ 中的赋值函数、移动赋值函数和移动构造函数,并给出相应的代码示例。 赋值函数 在…

    other 2023年6月26日
    00
  • Spring Boot详解配置文件有哪些作用与细则

    Spring Boot详解配置文件有哪些作用与细则 简介 在Spring Boot应用中,配置文件是非常重要的一部分。它能够让我们配置应用的各种环境参数,以便应用能够更好地运行。Spring Boot使用属性文件和yaml文件作为配置文件格式,使得我们可以在应用中轻松地配置和管理参数。 配置文件名称 Spring Boot应用使用的默认配置文件名称是appl…

    other 2023年6月25日
    00
  • 关于utf8:utf-8和iso-8859-1有什么区别?

    UTF-8和ISO-8859-1都是字符编码标准,但它们之间有很大的区别。以下是关于UTF-8和ISO-8859-1的详细攻略: UTF-8 UTF-8是一种可变长度的Unicode编码,它可以表示Unicode字符集中的任何字符。UTF-8使用1到4个字节来表示一个字符,其中ASCII字符使用1个字节,而其他字符使用2到4个字节。UTF-8是一种通用的编码…

    other 2023年5月8日
    00
  • matlab之sortrows()函数

    Matlab之sortrows()函数 在matlab中,我们可以使用sortrows()函数来对一个矩阵进行排序。这个函数通常用来对数据表格进行排序,但也可以排序一些特定的矩阵。在本文中,我们将探究sortrows()函数的用法和一些示例。 sortrows()函数语法 下面是sortrows函数的语法简述: B = sortrows(A,columns)…

    其他 2023年3月29日
    00
  • 安装Python和pygame及相应的环境变量配置(图文教程)

    安装Python和pygame及相应的环境变量配置是使用pygame开发游戏的前提条件,下面是详细的攻略。 1. 下载Python 首先需要下载Python,官网地址为 https://www.python.org/downloads/ ,在页面中选择合适的版本进行下载(推荐3.6以上版本)。 2. 安装Python 双击下载好的安装包,打开安装向导,一路点…

    other 2023年6月27日
    00
合作推广
合作推广
分享本页
返回顶部