用BlockingQueue解决生产消费者问题

阻塞队列(BlockingQueue)介绍

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。

阻塞队列提供了四种处理方法:

  • 抛出异常:当操作无法正常完成时,抛出异常
  • 返回特殊值: 当操作无法正常完成时,给出特殊值
  • 一直阻塞
  • 超时退出
方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用

Java里的阻塞队列

JDK7提供了7个阻塞队列。分别是

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

BlockingQueue 适合哪些情况

生产者消费者问题的任何有效解决方案都必须控制生成资源的产品的put()方法的调用及消耗资源的消费者的take()方法。

当于达成了对这个方法的阻塞控制,你就解决了这个问题。

Java提供开箱即用的支持来控制这种方法调用,其中一个线程正在创建资源,而另一个线程正在消耗它们 - 通过BlockingQueue 。 java.util.concurrent包中的Java BlockingQueue接口表示一个线程可以安全地插入并从中获取实例的队列。

BlockingQueue是一个将资源放入其中的线程,另一个线程从中获取的线程。

用BlockingQueue解决生产消费者问题

生产者

下面的代码是生产者线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

class Producer implements Runnable {
protected BlockingQueue<Object> queue;

Producer(BlockingQueue<Object> theQueue) {
this.queue = theQueue;
}

public void run() {
try {
while (true) {
Object justProduced = getResource();
queue.put(justProduced);
System.out.println("->Produced 放入一个元素, Queue 大小: " + queue.size());
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}

Object getResource() {
try {
Thread.sleep(100); // simulate time passing during read
} catch (InterruptedException ex) {
ex.printStackTrace();
}
return new Object();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

class Consumer implements Runnable {
protected BlockingQueue<Object> queue;

Consumer(BlockingQueue<Object> theQueue) {
this.queue = theQueue;
}

public void run() {
try {
while (true) {
Object obj = queue.take();
System.out.println("<-Consumed take - Queue大小: " + queue.size());
take(obj);
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}

void take(Object obj) {
try {
Thread.sleep(100); // simulate time passing
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("处理" + obj);
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) throws InterruptedException
{
int numProducers = 4;
int numConsumers = 3;

BlockingQueue<Object> myQueue = new LinkedBlockingQueue<>(20);

for (int i = 0; i < numProducers; i++){
new Thread(new Producer(myQueue)).start();
}

for (int i = 0; i < numConsumers; i++){
new Thread(new Consumer(myQueue)).start();
}

// Let the simulation run for, say, 10 seconds
Thread.sleep(10 * 1000);

// End of simulation - shut down gracefully
System.exit(0);
}

可能的输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<-[Thread-4]Consumed take  -  Queue大小: 0
->[Thread-0]Produced 放入一个元素, Queue 大小: 0
->[Thread-3]Produced 放入一个元素, Queue 大小: 1
<-[Thread-5]Consumed take - Queue大小: 0
->[Thread-1]Produced 放入一个元素, Queue 大小: 1
<-[Thread-6]Consumed take - Queue大小: 0
->[Thread-2]Produced 放入一个元素, Queue 大小: 1
->[Thread-2]Produced 放入一个元素, Queue 大小: 2
->[Thread-3]Produced 放入一个元素, Queue 大小: 3
处理java.lang.Object@100f189b
<-[Thread-4]Consumed take - Queue大小: 3
处理java.lang.Object@32e39829
处理java.lang.Object@a842999
->[Thread-0]Produced 放入一个元素, Queue 大小: 4
<-[Thread-6]Consumed take - Queue大小: 1
<-[Thread-5]Consumed take - Queue大小: 2
->[Thread-1]Produced 放入一个元素, Queue 大小: 2
->[Thread-3]Produced 放入一个元素, Queue 大小: 3
->[Thread-0]Produced 放入一个元素, Queue 大小: 4

....

分析

4号消费者准备使元素,但队列为空,进入阻塞
0号生产者生产一个元素,立马就被4号取走,所以队列大小为0
3号生产者又生产一个元素,队列大小为1
5号消费者又取走,队列大小为0。
2,3号生产者又生产了3个元素。
在11行4号消费者消费后,队列大小应该是2,奇怪,可能是多线程输出顺序的问题。

引用

http://www.infoq.com/cn/articles/java-blocking-queue
http://howtodoinjava.com/core-java/multi-threading/producer-consumer-problem-using-blockingqueue/
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/BlockingQueue.html
https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem