Java生产者-消费者问题的多种实现方案
以下是5种典型实现方式,涵盖从基础同步到高阶工具的使用:
1. wait()/notify() 基础同步(JDK1.0+)
核心逻辑:通过synchronized和对象锁协调线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| // 共享缓冲区类 class Buffer { private final LinkedList<Object> list = new LinkedList<>(); private final int MAX_SIZE = 10; public synchronized void produce() throws InterruptedException { while (list.size() >= MAX_SIZE) { wait(); // 缓冲区满时等待<sup>2</sup><sup>7</sup> } list.add(new Object()); System.out.println("生产, 当前库存: " + list.size()); notifyAll(); // 唤醒消费者<sup>9</sup> } public synchronized void consume() throws InterruptedException { while (list.isEmpty()) { wait(); // 缓冲区空时等待<sup>7</sup><sup>9</sup> } list.remove(); System.out.println("消费, 当前库存: " + list.size()); notifyAll(); // 唤醒生产者<sup>2</sup> } }
|
特点:需手动管理锁,易出现死锁,适合理解底层机制17。
2. Lock与Condition(JDK5+)
核心逻辑:通过显式锁和条件变量精细化控制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import java.util.concurrent.locks.*; class Buffer { private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); private final int MAX_SIZE = 10; private LinkedList<Object> list = new LinkedList<>(); public void produce() throws InterruptedException { lock.lock(); try { while (list.size() >= MAX_SIZE) { notFull.await(); // 等待非满条件<sup>7</sup> } list.add(new Object()); System.out.println("生产, 当前库存: " + list.size()); notEmpty.signalAll(); // 通知非空条件<sup>7</sup> } finally { lock.unlock(); } } public void consume() throws InterruptedException { lock.lock(); try { while (list.isEmpty()) { notEmpty.await(); // 等待非空条件<sup>7</sup> } list.remove(); System.out.println("消费, 当前库存: " + list.size()); notFull.signalAll(); // 通知非满条件<sup>7</sup> } finally { lock.unlock(); } } }
|
特点:支持多条件变量,避免虚假唤醒,灵活性高78。
3. BlockingQueue 阻塞队列(JDK5+)
核心逻辑:利用线程安全队列自动处理阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class Main { public static void main(String[] args) { BlockingQueue<Object> queue = new LinkedBlockingQueue<>(10); // 容量限制<sup>3</sup><sup>10</sup> // 生产者 new Thread(() -> { while (true) { try { queue.put(new Object()); // 队列满时自动阻塞<sup>4</sup><sup>6</sup> System.out.println("生产, 当前库存: " + queue.size()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }).start(); // 消费者 new Thread(() -> { while (true) { try { queue.take(); // 队列空时自动阻塞<sup>3</sup><sup>10</sup> System.out.println("消费, 当前库存: " + queue.size()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }).start(); } }
|
特点:代码简洁,无需手动同步,推荐生产环境使用3610。
4. 信号量 Semaphore(JDK5+)
核心逻辑:通过许可控制资源访问。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import java.util.concurrent.Semaphore; class Buffer { private final Semaphore emptySlots = new Semaphore(10); // 初始空位数量<sup>7</sup> private final Semaphore filledSlots = new Semaphore(0); // 初始已用数量<sup>7</sup> private final LinkedList<Object> list = new LinkedList<>(); public void produce() throws InterruptedException { emptySlots.acquire(); // 获取空位许可<sup>7</sup> synchronized (this) { list.add(new Object()); System.out.println("生产, 当前库存: " + list.size()); } filledSlots.release(); // 释放已用许可<sup>7</sup> } public void consume() throws InterruptedException { filledSlots.acquire(); // 获取已用许可<sup>7</sup> synchronized (this) { list.remove(); System.out.println("消费, 当前库存: " + list.size()); } emptySlots.release(); // 释放空位许可<sup>7</sup> } }
|
特点:适合控制并发资源数,但需结合同步块保证原子性78。
5. 管道通信(不推荐)
核心逻辑:通过PipedInputStream/PipedOutputStream传输数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import java.io.*; public class PipeExample { public static void main(String[] args) throws IOException { PipedInputStream pis = new PipedInputStream(); PipedOutputStream pos = new PipedOutputStream(); pos.connect(pis); // 连接管道<sup>7</sup> // 生产者写数据 new Thread(() -> { try { pos.write("data".getBytes()); pos.close(); } catch (IOException e) { e.printStackTrace(); } }).start(); // 消费者读数据 new Thread(() -> { try { int data; while ((data = pis.read()) != -1) { System.out.print((char) data); } pis.close(); } catch (IOException e) { e.printStackTrace(); } }).start(); } }
|
特点:数据流式传输,但难以控制缓冲区,实际应用较少7。
推荐优先级:
BlockingQueue > Lock/Condition > 信号量 > wait()/notify() > 管道