Java生产者-消费者问题的多种实现方案
以下是5种典型实现方式,涵盖从基础同步到高阶工具的使用:

1. wait()/notify() 基础同步(JDK1.0+)

核心逻辑:通过synchronized和对象锁协调线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 共享缓冲区类
class Buffer {
private final LinkedList<Object> list = new LinkedList<>();
private final int MAX_SIZE = 10;
public synchronized void produce() throws InterruptedException {
while (list.size() >= MAX_SIZE) {
wait(); // 缓冲区满时等待<sup>2</sup><sup>7</sup>
}
list.add(new Object());
System.out.println("生产, 当前库存: " + list.size());
notifyAll(); // 唤醒消费者<sup>9</sup>
}
public synchronized void consume() throws InterruptedException {
while (list.isEmpty()) {
wait(); // 缓冲区空时等待<sup>7</sup><sup>9</sup>
}
list.remove();
System.out.println("消费, 当前库存: " + list.size());
notifyAll(); // 唤醒生产者<sup>2</sup>
}
}

特点:需手动管理锁,易出现死锁,适合理解底层机制17。

2. Lock与Condition(JDK5+)

核心逻辑:通过显式锁和条件变量精细化控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.concurrent.locks.*;
class Buffer {
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final int MAX_SIZE = 10;
private LinkedList<Object> list = new LinkedList<>();
public void produce() throws InterruptedException {
lock.lock();
try {
while (list.size() >= MAX_SIZE) {
notFull.await(); // 等待非满条件<sup>7</sup>
}
list.add(new Object());
System.out.println("生产, 当前库存: " + list.size());
notEmpty.signalAll(); // 通知非空条件<sup>7</sup>
} finally {
lock.unlock();
}
}
public void consume() throws InterruptedException {
lock.lock();
try {
while (list.isEmpty()) {
notEmpty.await(); // 等待非空条件<sup>7</sup>
}
list.remove();
System.out.println("消费, 当前库存: " + list.size());
notFull.signalAll(); // 通知非满条件<sup>7</sup>
} finally {
lock.unlock();
}
}
}

特点:支持多条件变量,避免虚假唤醒,灵活性高78。

3. BlockingQueue 阻塞队列(JDK5+)

核心逻辑:利用线程安全队列自动处理阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Main {
public static void main(String[] args) {
BlockingQueue<Object> queue = new LinkedBlockingQueue<>(10); // 容量限制<sup>3</sup><sup>10</sup>
// 生产者
new Thread(() -> {
while (true) {
try {
queue.put(new Object()); // 队列满时自动阻塞<sup>4</sup><sup>6</sup>
System.out.println("生产, 当前库存: " + queue.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
// 消费者 new Thread(() -> {
while (true) {
try {
queue.take(); // 队列空时自动阻塞<sup>3</sup><sup>10</sup>
System.out.println("消费, 当前库存: " + queue.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
}

特点:代码简洁,无需手动同步,推荐生产环境使用3610。

4. 信号量 Semaphore(JDK5+)

核心逻辑:通过许可控制资源访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.Semaphore;
class Buffer {
private final Semaphore emptySlots = new Semaphore(10); // 初始空位数量<sup>7</sup>
private final Semaphore filledSlots = new Semaphore(0); // 初始已用数量<sup>7</sup>
private final LinkedList<Object> list = new LinkedList<>();
public void produce() throws InterruptedException {
emptySlots.acquire(); // 获取空位许可<sup>7</sup>
synchronized (this) {
list.add(new Object());
System.out.println("生产, 当前库存: " + list.size());
}
filledSlots.release(); // 释放已用许可<sup>7</sup>
}
public void consume() throws InterruptedException {
filledSlots.acquire(); // 获取已用许可<sup>7</sup>
synchronized (this) {
list.remove();
System.out.println("消费, 当前库存: " + list.size());
}
emptySlots.release(); // 释放空位许可<sup>7</sup>
}
}

特点:适合控制并发资源数,但需结合同步块保证原子性78。

5. 管道通信(不推荐)

核心逻辑:通过PipedInputStream/PipedOutputStream传输数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.io.*;
public class PipeExample {
public static void main(String[] args) throws IOException {
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream();
pos.connect(pis); // 连接管道<sup>7</sup>
// 生产者写数据
new Thread(() -> {
try {
pos.write("data".getBytes());
pos.close();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
// 消费者读数据
new Thread(() -> {
try {
int data;
while ((data = pis.read()) != -1) {
System.out.print((char) data);
}
pis.close();
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}

特点:数据流式传输,但难以控制缓冲区,实际应用较少7。

推荐优先级:

BlockingQueue > Lock/Condition > 信号量 > wait()/notify() > 管道