Java并发编程-阻塞队列(BlockingQueue)的实现原理
发布日期:2021-05-10 01:30:31 浏览次数:18 分类:精选文章

本文共 4328 字,大约阅读时间需要 14 分钟。

Java阻塞队列(BlockingQueue)详解

阻塞队列(BlockingQueue)是Java util.concurrent 包中的重要数据结构。它提供了线程安全的队列访问方式:插入数据时,如果队列已满,线程会阻塞等待直到队列非满;取数据时,如果队列已空,线程会阻塞等待直到队列非空。BlockingQueue 是许多高级同步类实现的基础。

阻塞队列的操作方法

阻塞队列提供了四组不同的方法,用于插入、移除以及对队列元素的检查。这些方法根据操作结果有不同的行为方式:

  • 抛异常:如果操作无法立即执行,抛出异常。
  • 特定值:如果操作无法立即执行,返回特定值(常见于 truefalse)。
  • 阻塞:如果操作无法立即执行,方法调用会阻塞等待直到能执行。
  • 超时:如果操作无法立即执行,方法调用会阻塞等待,但不会超过指定时间。返回特定值告知操作结果。
  • 需要注意的是,BlockingQueue 不允许插入 null,否则会抛出 NullPointerException

    此外,队列中的其他元素可以通过特定方法移除,但这种操作效率较低,建议避免使用,除非必要。例如,使用 remove(Object) 方法就可能性能不佳。


    BlockingQueue 的实现类

    BlockingQueue 是一个接口,其实现类包括:

  • ArrayBlockingQueue:基于数组实现的有界队列,初始化时指定容量,之后无法修改。
  • DelayQueue:队列元素会被延迟直到特定延迟到期,插入元素需实现 Delayed 接口。
  • LinkedBlockingQueue:基于链表实现的无界队列,若未指定容量,取默认上限为 Integer.MAX_VALUE
  • PriorityBlockingQueue:无界队列,基于 PriorityQueue 实现,元素需实现 Comparable 接口。
  • SynchronousQueue:特殊队列,内部仅容纳一个元素,插入和抽取操作间具有 Blocking 特性。

  • 生产者消费者模式示例

    阻塞队列的最典型用途是生产者消费者模式。以下是一个简单示例:

    import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public class BlockingQueueTest {    static BlockingQueue
    blockingQueue = new LinkedBlockingQueue<>(10); static class Producer implements Runnable { private final BlockingQueue
    blockingQueue; private volatile boolean flag; private Random random; Producer(BlockingQueue
    queue) { this.blockingQueue = queue; flag = false; random = new Random(); } @Override public void run() { while (!flag) { int info = random.nextInt(100); try { blockingQueue.put(info); System.out.println(Thread.currentThread().getName() + " produce " + info); Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } } void shutDown() { flag = true; } } static class Consumer implements Runnable { private final BlockingQueue
    blockingQueue; private volatile boolean flag; Consumer(BlockingQueue
    queue) { this.blockingQueue = queue; } @Override public void run() { while (!flag) { try { int info = blockingQueue.take(); System.out.println(Thread.currentThread().getName() + " consumer " + info); Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } } void shutDown() { flag = true; } } public static void main(String[] args) throws InterruptedException { // 启动5 个生产者和5 个消费者 for (int i = 0; i < 10; i++) { if (i < 5) { new Thread(new Producer(blockingQueue), "producer" + i).start(); } else { new Thread(new Consumer(blockingQueue), "consumer" + (i - 5)).start(); } } // 等待队列不是空的状态可以立即开始消费 Thread.sleep(1000); // 通知所有生产者和消费者关闭 for (Runnable thread : new Thread[10]) { if (thread instanceof Producer || thread instanceof Consumer) { ((Runnable) thread).shutDown(); } } }}

    阻塞队列原理

    阻塞队列通过 ReentrantLock 实现锁和条件对象,支持多线程环境下的安全队列操作。例如,ArrayBlockingQueue 通过一个数组实现有界队列,使用 notFullnotEmpty 条件对象管理队列状态。

    以下是 ArrayBlockingQueue 的简要实现代码:

    public class ArrayBlockingQueue
    extends AbstractBlockchainQueue
    implements BlockingQueue
    { // ... 原始代码 ... public void put(E e) throws InterruptedException { checkNotNull(e); lock.lockInterruptibly(); try { while (count == items.length) { notFull.await(); } enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { int mi = putIndex; items[mi] = x; if (++mi == items.length) { mi = 0; } count++; notEmpty.signal(); itrs.lastAddedNode(x); } // ... 其他方法 ...}

    双端阻塞队列(BlockingDeque)

    BlockingDeque 是基于 BlockingQueue 的双端队列,支持从任一端插入或抽取元素。它提供了与 BlockingQueue 相似的功能,但具有更高的灵活性。

    上一篇:Grafana3.1.0安装步骤
    下一篇:nginx负载均衡的5种策略(转载)

    发表评论

    最新留言

    做的很好,不错不错
    [***.243.131.199]2025年05月11日 18时09分19秒