JUC并发编程LinkedBlockingQueue队列深入分析源码

JUC并发编程LinkedBlockingQueue队列深入分析源码

1. LinkedBlockingQueue简介

LinkedBlockingQueue是Java集合框架中的一种队列,它实现了BlockingQueue接口,并且是线程安全的,支持高效并发读写操作。LinkedBlockingQueue是一种无界队列,因此队列中的元素数量可以无限增长,不像ArrayBlockingQueue那样在队列满时会抛出异常。

2. LinkedBlockingQueue源码分析

先来看一下LinkedBlockingQueue的构造方法:

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

可以看出LinkedBlockingQueue有两种构造方式,如果不指定初始容量,则容量为Integer.MAX_VALUE,否则将容量设置为指定值,并且要求初始容量必须大于0。

LinkedBlockingQueue是一个链式结构的阻塞队列,所以在实现上使用双向链表来存储数据,链表头节点head和尾节点last的值为null,不存储元素,只是用于标识队列的起点和终点。每个结点都包括节点元素(item)和下一个节点的引用(next)。

其中Node的内部类定义如下:

private static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

LinkedBlockingQueue内部还有两个与队列操作相关的锁:putLock和takeLock。putLock是执行插入操作时所获得的锁,而takeLock是执行获取操作时所获得的锁。这两个锁的目的是控制并发对队列的访问,保证线程安全。

LinkedBlockingQueue的插入方法offer分为两个步骤:

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    Node<E> node = new Node<E>(e);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

第一步是通过ReentrantLock的lock()方法来获取putLock锁,插入操作是对队列尾部进行操作,先创建一个新节点node,并将putLock锁住,保证线程安全。然后判断队列是否已满,若未满则将新节点插入到队列尾部。

第二步是若队列为空,通知当前所有正在等待takeLock锁的线程不为空,因为有数据入队了。

接下来是LinkedBlockingQueue的获取方法take的源码:

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

take方法首先对队列头进行访问,因此获取的是takeLock锁,并且在获取锁的过程中可以被中断。其次,如果队列为空,则需要进入await()状态,等待其他线程插入元素后再通知取出数据并且处理。

3. LinkedBlockingQueue应用示例

我们可以使用LinkedBlockingQueue来实现生产者消费者模型。

先看生产者:

public class Producer implements Runnable{ 
    private final LinkedBlockingQueue<String> queue; 
    public Producer(LinkedBlockingQueue<String> queue){ 
        this.queue = queue; 
    } 
    @Override 
    public void run() { 
        try {
            for(int i = 0 ;i < 10; i++){ 
                String s = "product" + i;
                queue.put(s); 
                System.out.println("生产者-" + Thread.currentThread().getName() + " 生产:" + s); 
            } 
        } catch (Exception e) { 
            e.printStackTrace(); 
        } 
    }
}

生产者使用put()方法将数据放进队列中。

然后是消费者:

public class Consumer implements Runnable{ 
    private final LinkedBlockingQueue<String> queue; 
    public Consumer(LinkedBlockingQueue<String> queue){ 
        this.queue = queue; 
    } 
    @Override 
    public void run() { 
        try { 
            while(true){ 
                String s = queue.take(); 
                System.out.println("消费者-" + Thread.currentThread().getName() + " 消费:" + s); 
            } 
        } catch (InterruptedException e) { 
            e.printStackTrace(); 
        } 
    } 
}

消费者使用take()方法从队列中取出数据。

最后,启动生产者和消费者线程:

LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(3); 
Producer producer1 = new Producer(queue); 
Producer producer2 = new Producer(queue); 
Producer producer3 = new Producer(queue); 
Consumer consumer = new Consumer(queue); 
new Thread(producer1).start(); 
new Thread(producer2).start(); 
new Thread(producer3).start(); 
new Thread(consumer).start();

注意,我们使用了构造方法LinkedBlockingQueue<>(3)来创建一个容量为3的队列。这里启动了3个生产者和1个消费者,每个生产者往队列中插入10个数据,总共会插入30个数据,而消费者则使用了无限循环来取出数据,直接将队列中所有数据全部取出。

运行结果如下:

生产者-Thread-0 生产:product0
生产者-Thread-1 生产:product0
消费者-Thread-3 消费:product0
生产者-Thread-1 生产:product1
生产者-Thread-2 生产:product0
消费者-Thread-3 消费:product1
生产者-Thread-1 生产:product2
生产者-Thread-2 生产:product1
消费者-Thread-3 消费:product2
生产者-Thread-1 生产:product3
生产者-Thread-2 生产:product2
消费者-Thread-3 消费:product3
生产者-Thread-1 生产:product4
生产者-Thread-2 生产:product3
消费者-Thread-3 消费:product4
生产者-Thread-1 生产:product5
生产者-Thread-2 生产:product4
消费者-Thread-3 消费:product5
生产者-Thread-1 生产:product6
生产者-Thread-2 生产:product5
消费者-Thread-3 消费:product6
生产者-Thread-1 生产:product7
生产者-Thread-2 生产:product6
消费者-Thread-3 消费:product7
生产者-Thread-1 生产:product8
生产者-Thread-2 生产:product7
消费者-Thread-3 消费:product8
生产者-Thread-1 生产:product9
生产者-Thread-2 生产:product8
消费者-Thread-3 消费:product9
消费者-Thread-3 消费:product0
消费者-Thread-3 消费:product1
消费者-Thread-3 消费:product2
消费者-Thread-3 消费:product3
消费者-Thread-3 消费:product4
消费者-Thread-3 消费:product5
消费者-Thread-3 消费:product6
消费者-Thread-3 消费:product7
消费者-Thread-3 消费:product8
消费者-Thread-3 消费:product9

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:JUC并发编程LinkedBlockingQueue队列深入分析源码 - Python技术站

(0)
上一篇 2023年5月17日
下一篇 2023年5月17日

相关文章

  • 关于Java 并发的 CAS

    CAS(Compare and Swap)是一种并发机制,用于实现原子性操作。在并发编程中,当多个线程同时对共享变量进行操作时,会产生竞争条件(Race Condition),导致数据的不一致性、丢失、覆盖等问题。CAS机制通过比较期望值与实际值的方式,来确保正确性与一致性。 CAS的原理 CAS操作包括三个操作数:内存位置(V)、预期原值(A)和新值(B)…

    多线程 2023年5月17日
    00
  • 基于C++11的threadpool线程池(简洁且可以带任意多的参数)

    基于C++11的threadpool线程池(简洁且可以带任意多的参数) 介绍 线程池是一种并发编程中提高性能与效率的技术,可以避免频繁创建和销毁线程,提升程序运行效率。本文将介绍基于C++11的线程池实现,并且可以传递任意多的参数。 实现 线程池主要由任务队列和线程池管理器两个部分组成。线程池管理器主要用来创建、销毁线程和管理任务队列,线程池中的任务队列存储…

    多线程 2023年5月16日
    00
  • Go并发与锁的两种方式该如何提效详解

    Go并发与锁的两种方式该如何提效详解 先谈一下Go中的协程和锁 Go语言的协程是一种并发执行代码的方式。协程可以方便的并发执行任务,不需要等待前面的任务完成,直接执行下一个任务,提高了程序运行的效率。 而锁则可以保证在多个协程同时访问共享数据时不会发生冲突。 对于共享数据的并发访问,常用的两种方式 1. 互斥锁 互斥锁是最常用的一种锁。它可以保证在同一时刻只…

    多线程 2023年5月16日
    00
  • 基于rocketmq的有序消费模式和并发消费模式的区别说明

    基于RocketMQ的有序消费模式和并发消费模式的区别说明 1. 有序消费模式 在有序消费模式下,消息消费是按照消息的发送顺序依次进行的。具体实现方式是,消息生产者将消息发送到同一个Message Queue中,而Message Queue按照顺序将消息发送给Consumer进行消费。因此,在有序消费模式下,同一个Message Queue的消息一定会按照发…

    多线程 2023年5月17日
    00
  • 分享J2EE的13种核心技术

    分享J2EE的13种核心技术攻略 1. 学习J2EE的目的 J2EE是Java 2企业版的缩写。它是一种Java开发平台,在开发大型企业应用时非常有用。J2EE平台提供了一个标准的框架,用于构建分布式和可扩展的企业应用程序。学习J2EE主要有以下目的: 理解J2EE平台的核心概念和架构 熟悉J2EE的编程模型和APIs 掌握J2EE开发的13种核心技术 2.…

    多线程 2023年5月17日
    00
  • Java 浅谈 高并发 处理方案详解

    Java浅谈高并发处理方案详解 前言 随着互联网的发展和用户访问量的逐步增加,高并发逐渐成为互联网开发中的常见问题。而 Java 作为一门流行的编程语言,其处理高并发问题的方案也备受关注。本篇文章将浅谈 Java 高并发处理方案,并且给出两个对高并发处理方案的具体示例。 常用的高并发处理方案 多线程 多线程是 Java 中常用的高并发解决方案。可以通过创建多…

    多线程 2023年5月16日
    00
  • Java高并发BlockingQueue重要的实现类详解

    Java高并发BlockingQueue重要的实现类详解 概述 在Java中,BlockingQueue是一种很重要的线程安全容器,它提供了线程安全的数据存储和获取操作,用于在多线程并发场景中实现生产者-消费者模式的应用。本文将详细介绍BlockingQueue的相关实现类,包括ArrayBlockingQueue、LinkedBlockingQueue、S…

    多线程 2023年5月16日
    00
  • python如何支持并发方法详解

    下面是关于Python如何支持并发方法的详解攻略。 1. 什么是并发? 并发是指一个系统可以同时处理多个任务的能力。Python中通过多线程和多进程两种方式实现并发编程。 2. Python的并发方法 2.1 多线程 Python中的多线程通过threading库实现。多线程可以在同一进程内分配不同任务给不同线程执行,从而提高程序的效率。 下面是一个基本的多…

    多线程 2023年5月16日
    00
合作推广
合作推广
分享本页
返回顶部