阻塞队列(Blocking Queue)是 Java 并发包(java.util.concurrent)中的一种特殊队列,它提供了线程安全的操作,并且在队列为空时进行取元素操作或者队列已满时进行插入元素操作时,会让线程进入阻塞状态,直到满足相应的条件。下面详细介绍阻塞队列的使用,包括常见的阻塞队列实现类、基本操作以及示例代码。
常见的阻塞队列实现类
ArrayBlockingQueue:基于数组实现的有界阻塞队列,创建时需要指定队列的容量。
LinkedBlockingQueue:基于链表实现的阻塞队列,可以是有界的,也可以是无界的(默认情况下是无界的)。
PriorityBlockingQueue:基于堆实现的无界优先级阻塞队列,队列中的元素会按照优先级进行排序。
SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作,反之亦然。
DelayQueue:一个支持延迟获取元素的无界阻塞队列,队列中的元素必须实现 Delayed 接口。
基本操作
阻塞队列提供了多种操作方法,这些方法可以根据在队列满或空时的不同行为分为以下几类:
插入操作
add(E e):如果队列未满,则将元素插入队列,成功返回 true;如果队列已满,则抛出 IllegalStateException 异常。
offer(E e):如果队列未满,则将元素插入队列,成功返回 true;如果队列已满,则返回 false。
offer(E e, long timeout, TimeUnit unit):在指定的时间内尝试将元素插入队列,如果成功则返回 true;如果超时仍未成功,则返回 false。
put(E e):如果队列未满,则将元素插入队列;如果队列已满,则线程会被阻塞,直到队列有空间。
移除操作
remove():如果队列不为空,则移除并返回队列的头部元素;如果队列为空,则抛出 NoSuchElementException 异常。
poll():如果队列不为空,则移除并返回队列的头部元素;如果队列为空,则返回 null。
poll(long timeout, TimeUnit unit):在指定的时间内尝试移除并返回队列的头部元素,如果成功则返回元素;如果超时仍未成功,则返回 null。
take():如果队列不为空,则移除并返回队列的头部元素;如果队列为空,则线程会被阻塞,直到队列中有元素。
检查操作
element():如果队列不为空,则返回队列的头部元素,但不移除;如果队列为空,则抛出 NoSuchElementException 异常。
peek():如果队列不为空,则返回队列的头部元素,但不移除;如果队列为空,则返回 null。
示例代码
以下是一个使用 ArrayBlockingQueue 的示例,展示了生产者 - 消费者模型:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
// 生产者线程类
class Producer implements Runnable {
private final BlockingQueue
public Producer(BlockingQueue
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
System.out.println("生产者生产: " + i);
// 使用 put 方法插入元素,如果队列已满则阻塞
queue.put(i);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者线程类
class Consumer implements Runnable {
private final BlockingQueue
public Consumer(BlockingQueue
this.queue = queue;
}
@Override
public void run() {
try {
for (int i = 0; i < 5; i++) {
// 使用 take 方法移除元素,如果队列为空则阻塞
Integer item = queue.take();
System.out.println("消费者消费: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class BlockingQueueExample {
public static void main(String[] args) {
// 创建一个容量为 2 的 ArrayBlockingQueue
BlockingQueue
// 创建生产者和消费者线程
Thread producerThread = new Thread(new Producer(queue));
Thread consumerThread = new Thread(new Consumer(queue));
// 启动线程
producerThread.start();
consumerThread.start();
try {
// 等待生产者和消费者线程执行完毕
producerThread.join();
consumerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
代码解释
Producer 类实现了 Runnable 接口,在 run() 方法中使用 put() 方法向队列中插入元素,如果队列已满,则线程会被阻塞。
Consumer 类实现了 Runnable 接口,在 run() 方法中使用 take() 方法从队列中移除元素,如果队列为空,则线程会被阻塞。
在 main() 方法中,创建了一个容量为 2 的 ArrayBlockingQueue,并启动了生产者和消费者线程。
通过使用阻塞队列,可以很方便地实现生产者 - 消费者模型,避免了手动处理线程同步和等待唤醒机制的复杂性。