源码分析Java中ThreadPoolExecutor的底层原理

源码分析Java中ThreadPoolExecutor的底层原理

1. 简介

ThreadPoolExecutor是Java提供的一个线程池的实现类,利用它可以实现线程池的管理、控制和优化。该类实现了ExecutorServiceAbstractExecutorService接口,是实现线程池的关键。

本篇文章将对ThreadPoolExecutor进行源码分析,深入了解线程池的底层原理。

2. 使用示例

首先,我们来看一下ThreadPoolExecutor的构造方法:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // ...
}

在使用ThreadPoolExecutor时,我们需要指定以下七个参数:

  • corePoolSize: 线程池中核心线程数的最大值。
  • maximumPoolSize: 线程池中最大线程数的最大值。
  • keepAliveTime: 非核心线程闲置时超时的时间。
  • unit: 上述时间的时间单位。
  • workQueue: 用于存储任务队列的BlockingQueue。
  • threadFactory: 用于创建线程的ThreadFactory。
  • handler: 表示当BlockingQueue中的任务数量超出线程池处理能力时的处理程序。

下面是一个使用示例,我们创建一个ThreadPoolExecutor对象,它的核心线程数为5,最大线程数为10,任务队列的容量为20,超时时间为60秒:

ThreadPoolExecutor executor = new ThreadPoolExecutor(
        5, 10, 60, TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(20),
        Executors.defaultThreadFactory(),
        new ThreadPoolExecutor.AbortPolicy());

3. 底层实现原理

3.1 线程池工作原理

线程池的工作流程如下:

  1. 当有任务提交到线程池时,线程池会判断当前核心线程是否已满。如果未满,则创建一个新线程来处理这个任务。
  2. 当核心线程池已满时,任务会被存储在工作队列中。
  3. 当工作队列已满时,线程池会创建新线程来处理任务,直到线程池中线程数量达到最大线程数。
  4. 如果线程池中线程数量已达到最大值,而队列中的任务又无法处理,则线程池会执行拒绝策略。

3.2 线程池核心类分析

ThreadPoolExecutor包含了线程池执行的核心代码,它实现了ExecutorExecutorService接口。主要包括以下类:

  • Worker类:线程池的执行线程,是线程池的真正执行者。
  • Worker对象:维护着ThreadPoolExecutor的线程池、任务队列、线程池状态等信息。
  • ThreadPoolExecutor核心方法:主要包括提交任务、线程池操作等方法。

下面,我们来逐一解读这些类和方法。

3.3 Worker

Worker类是线程池的执行线程,它会不断地获取队列中的任务进行执行。

ThreadPoolExecutor中,Worker的定义如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    // ...
}

Worker继承自AbstractQueuedSynchronizer类,同时实现了Runnable接口,意味着它可以在一个锁中执行任务。

Worker的核心方法是runWorker,它是ThreadPoolExecutor中的最重要方法之一。以下是runWorker的代码:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                    && !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

其中,getTask方法是用于从任务队列中获取任务的方法,它工作原理如下:

  1. 如果线程池状态大于等于STOP,则线程需要退出处理循环。在退出前,需要调用rsignal方法唤醒任何可能在getTask方法中等待的线程。
  2. 尝试从队列中poll任务。
  3. 如果一直从队列中取任务失败,返回null。

3.4 ThreadPoolExecutor核心方法

ThreadPoolExecutor类主要包括以下核心方法:

  • execute方法:用于向线程池提交任务的方法。
  • addWorker方法:用于创建Worker线程。
  • getTask方法:用于从工作队列中获取任务的方法。
  • runWorker方法:用于执行任务的方法。
  • tearDown方法:用于终止线程池。

我们简述几个重要方法的工作原理。

3.4.1 execute方法

以下是execute方法的代码实现:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

它的工作流程如下:

  1. 接收一个Runnable类型的任务并检查该任务是否为null。
  2. 通过ctl变量检查线程池状态,核心线程数是否更小与核心线程数最大值,若满足,就添加一个Thread。
  3. 若步骤2中未成功添加线程,则尝试将任务加入队列中。
  4. 若步骤3中队列满了,则创建一个线程进行任务处理。
  5. 如果所有尝试添加任务到队列或启动新的线程(步骤3和步骤4)都失败,则执行拒绝策略。

3.4.2 addWorker方法

以下是addWorker方法的代码实现:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                  firstTask == null &&
                  !workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);   // 添加新的Worker到线程池中
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker方法是向线程池中添加一个新线程的方法。如果仍未达到线程池最大值,则可以继续添加新的线程。

3.5 示例说明

下面我们通过两个示例来说明线程池ThreadPoolExecutor的使用和原理。

示例1

在该示例中,我们使用ThreadPoolExecutor来实现一段计算密集型的任务。该任务将检查给定范围内的所有数字,计算是否为素数,并打印出所有素数。

import java.util.concurrent.*;

public class PrimeFinder implements Runnable {
    private final int start;
    private final int end;
    private final BlockingQueue<Integer> primes;
    private final ExecutorService executor;

    public PrimeFinder(int start, int end) {
        this.start = start;
        this.end = end;
        primes = new ArrayBlockingQueue<Integer>(10);
        executor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void run() {
        for (int i = start; i <= end; i++) {
            if (isPrime(i)) {
                primes.add(i);
            }
        }
    }

    public void primeNumbers() throws InterruptedException {
        int numThreads = 4;
        for (int i = 0; i < numThreads; i++) {
            executor.execute(this);
        }

        int numPrimes = 0;
        while (numPrimes < (end - start + 1)) {
            int prime = primes.take();
            System.out.println(prime);
            numPrimes++;
        }
        executor.shutdown();
    }

    private boolean isPrime(int n) {
        if (n <= 1)
            return false;
        else if (n == 2)
            return true;
        else if (n % 2 == 0)
            return false;
        else {
            for (int i = 3; i <= Math.sqrt(n); i += 2) {
                if (n % i == 0) {
                    return false;
                }
            }
            return true;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PrimeFinder primeFinder = new PrimeFinder(1, 1000);
        primeFinder.primeNumbers();
    }
}

在上述代码中,我们使用ThreadPoolExecutorThread来实现多线程执行,将任务分为4个并行线程来计算,加快计算的速度。

示例2

在该示例中,我们使用ThreadPoolExecutor来下载一批网络资源。代码如下:

import java.io.*;
import java.net.URL;
import java.util.concurrent.*;

public class Download implements Runnable {
    private final URL url;
    private final String filename;
    private final ExecutorService executor;

    public Download(URL url, String filename) {
        this.url = url;
        this.filename = filename;
        executor = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(20), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void run() {
        try(InputStream stream = url.openStream()) {
            int read;
            try(FileOutputStream fos = new FileOutputStream(new File(filename))) {
                while ((read = stream.read()) != -1) {
                    fos.write(read);
                }
                fos.flush();
            }
        } catch(IOException e) {
            e.printStackTrace();
        }
    }

    public void download() throws InterruptedException {
        executor.execute(this);
    }

    public static void main(String[] args) throws InterruptedException {
        Download download1 = new Download(new URL("http://www.example.com/a"), "a.html");
        Download download2 = new Download(new URL("http://www.example.com/b"), "b.html");
        Download download3 = new Download(new URL("http://www.example.com/c"), "c.html");
        Download download4 = new Download(new URL("http://www.example.com/d"), "d.html");
        Download download5 = new Download(new URL("http://www.example.com/e"), "e.html");

        download1.download();
        download2.download();
        download3.download();
        download4.download();
        download5.download();

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.shutdown();
    }
}

在上述代码中,我们使用ThreadPoolExecutorThread来实现多线程下载,加快了五个资源文件的下载速度。

4. 总结

本文从实例出发,深入浅出地介绍了Java中线程池的底层原理,并详细地分析了ThreadPoolExecutor这一核心类的源代码。通过本文的学习,我们可以更好地理解线程池的工作原理,以及如何使用线程池提高程序的效率。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:源码分析Java中ThreadPoolExecutor的底层原理 - Python技术站

(0)
上一篇 2023年6月27日
下一篇 2023年6月27日

相关文章

  • 在go中使用http.newrequest处理错误

    在Go中使用http.NewRequest处理错误 在Go中,我们可以使用http.NewRequest函数创建HTTP请求。但是,如果我们不小心处理错误,可能会导致程序崩溃或出现其他问题。本攻略将介绍何在Go中使用http.NewRequest处理错误,并提供两个示例。 处理错误 在Go中,我们可以使用http.NewRequest函数创建请求。该函数返回…

    other 2023年5月9日
    00
  • Spring初始化和销毁的实现方法

    下面是关于 Spring 初始化和销毁的实现方法的详细攻略。 什么是Spring Bean的初始化和销毁 Spring Framework中的Bean对象可以有生命周期,即它们可以被创建、初始化、使用和销毁。在Spring容器中,Bean初始化和销毁时可以自动触发某些动作。 在Java中,Bean的实例化,通过构造函数或者静态工厂方法完成,而数据注入则是通过…

    other 2023年6月20日
    00
  • 关于python:来自单个列表的pairs

    简介 在Python中,可以使用zip函数将两个列表中的元素一一对应组成新的列表。但是,如果我们想要从单个列表中创建一组对,可以使用列表解析或者生成器表式来实现。 步骤 下面是从单个列表中创建一组对的步骤: 使用列表解析或者生成器表达式来创建一组对。 将创建的一组对存储到一个新的列表中。 示例说明 下面是两个示例说明,分别演示了如何从单个列表中创建一组对。 …

    other 2023年5月8日
    00
  • python中attribute什么意思

    当然,我很乐意为您提供有关“Python中attribute的含义”的完整攻略。以下是详细的步骤和两个示例: 1 Python中attribute的含义 在Python中,attribute是指对象的属性或特征。每个Python对象都有一组属性,可以通过点号(.)或()函数来访问它们。属性可以是数据属性或方法属性。 1.1 访问attribute 要访问对象…

    other 2023年5月6日
    00
  • 手把手教你看kegg通路图!

    当然,我很乐意为您提供有关如何查看KEGG通路图的完整攻略。以下是详细的步骤和两个示例: 1. 访问KEGG网站 在开始查看KEGG通路图之前,您需要访问KEGG网站。您可以通过访问KEGG网站(https://www.kegg.jp/)来访问KEGG数据库。 2. 搜索KEGG通路图 在KEGG网站上,您可以使用搜索框来搜索KEGG通路图。可以输入通路图名…

    other 2023年5月6日
    00
  • LINUX下架设代理服务器的教程

    Linux下架设代理服务器需要用到Squid软件来完成。以下是详细的步骤: 1. 安装Squid 可以使用以下命令在Ubuntu系统上安装Squid软件: sudo apt-get update sudo apt-get install squid 2. 配置Squid 默认情况下,Squid的配置文件位于/etc/squid/squid.conf。可以使用…

    other 2023年6月27日
    00
  • 电脑如何清理内存?内存清理方法介绍

    电脑如何清理内存?内存清理方法介绍 清理内存是优化电脑性能的重要步骤之一。内存清理可以帮助释放被占用的内存空间,提高系统的响应速度和运行效率。下面是一些常见的内存清理方法,供您参考。 1. 关闭不必要的程序和进程 在电脑运行过程中,可能会有许多不必要的程序和进程在后台运行,占用系统内存资源。关闭这些不必要的程序和进程可以释放内存空间。以下是示例说明: 示例1…

    other 2023年7月31日
    00
  • linux系统下cifs文件系统

    Linux系统下CIFS文件系统 CIFS(Common Internet File System)是一种用于在Linux系统中访问Windows共享文件夹的协议。本文将介绍如何在Linux系统中使用CIFS文件系统,包括安装和CIFS文件系统,以及如何挂载和卸载Windows共享文件夹。 1. 安装和配置CIFS文件系统 在Linux系统中,可以使用以下命…

    other 2023年5月7日
    00
合作推广
合作推广
分享本页
返回顶部