生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题。有如下几个常见的实现方法:
1. wait()/notify()
2. lock & condition
3. BlockingQueue
下面来逐一分析。
1. wait()/notify()
第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行;这也是最原始的实现。
1 public class WaitNotifyBroker implements Broker{2
3 private finalObject[] items;4
5 private inttakeIndex;6 private intputIndex;7 private intcount;8
9 public WaitNotifyBroker(intcapacity) {10 this.items = newObject[capacity];11 }12
13 @SuppressWarnings("unchecked")14 @Override15 publicT take() {16 T tmpObj = null;17 try{18 synchronized(items) {19 while (0 ==count) {20 items.wait();21 }22 tmpObj =(T) items[takeIndex];23 if (++takeIndex ==items.length) {24 takeIndex = 0;25 }26 count--;27 items.notify();28 }29 } catch(InterruptedException e) {30 e.printStackTrace();31 }32
33 returntmpObj;34 }35
36 @Override37 public voidput(T obj) {38 try{39 synchronized(items) {40 while (items.length ==count) {41 items.wait();42 }43
44 items[putIndex] =obj;45 if (++putIndex ==items.length) {46 putIndex = 0;47 }48 count++;49 items.notify();50 }51 } catch(InterruptedException e) {52 e.printStackTrace();53 }54
55 }56
57 }
这里利用Array构造一个Buffer去存取数据,并利用count, putIndex和takeIndex来保证First-In-First-Out。
如果利用LinkedList来代替Array,相对来说会稍微简单些。
LinkedList的实现,可以参考《Java 7 Concurrency Cookbook》第2章wait/notify。
2.lock & condition
lock & condition,实际上也实现了类似synchronized和wait()/notify()的功能,但在加锁和解锁、暂停和唤醒方面,更加细腻和可控。
在JDK的BlockingQueue的默认实现里,也是利用了lock & condition。此文也详细介绍了怎么利用lock&condition写BlockingQueue,这里换LinkedList再实现一次:
1 public class LockConditionBroker implements Broker{2
3 private finalReentrantLock lock;4 private finalCondition notFull;5 private finalCondition notEmpty;6 private final intcapacity;7 private LinkedListitems;8
9 public LockConditionBroker(intcapacity) {10 this.lock = newReentrantLock();11 this.notFull =lock.newCondition();12 this.notEmpty =lock.newCondition();13 this.capacity =capacity;14
15 items = new LinkedList();16 }17
18 @Override19 publicT take() {20 T tmpObj = null;21 lock.lock();22 try{23 while (items.size() == 0) {24 notEmpty.await();25 }26
27 tmpObj =items.poll();28 notFull.signalAll();29
30 } catch(InterruptedException e) {31 e.printStackTrace();32 } finally{33 lock.unlock();34 }35 returntmpObj;36 }37
38 @Override39 public voidput(T obj) {40 lock.lock();41 try{42 while (items.size() ==capacity) {43 notFull.await();44 }45
46 items.offer(obj);47 notEmpty.signalAll();48
49 } catch(InterruptedException e) {50 e.printStackTrace();51 } finally{52 lock.unlock();53 }54
55 }56 }
3.BlockingQueue
最后这种方法,也是最简单最值得推荐的。利用并发包提供的工具:阻塞队列,将阻塞的逻辑交给BlockingQueue。
实际上,上述1和2的方法实现的Broker类,也可以视为一种简单的阻塞队列,不过没有标准包那么完善。
1 public class BlockingQueueBroker implements Broker{2
3 private final BlockingQueuequeue;4
5 publicBlockingQueueBroker() {6 this.queue = new LinkedBlockingQueue();7 }8
9 @Override10 publicT take() {11 try{12 returnqueue.take();13 } catch(InterruptedException e) {14 e.printStackTrace();15 }16
17 return null;18 }19
20 @Override21 public voidput(T obj) {22 try{23 queue.put(obj);24 } catch(InterruptedException e) {25 e.printStackTrace();26 }27 }28
29 }
我们的队列封装了标注包里的LinkedBlockingQueue,十分简单高效。
接下来,就是一个1P2C的例子:
1 public interface Broker{2
3 T take();4
5 voidput(T obj);6
7 }8
9
10 public class Producer implementsRunnable {11
12 private final Brokerbroker;13 private finalString name;14
15 public Producer(Brokerbroker, String name) {16 this.broker =broker;17 this.name =name;18 }19
20 @Override21 public voidrun() {22 try{23 for (int i = 0; i < 5; i++) {24 broker.put(i);25 System.out.format("%s produced: %s%n", name, i);26 Thread.sleep(1000);27 }28 broker.put(-1);29 System.out.println("produced termination signal");30 } catch(InterruptedException e) {31 e.printStackTrace();32 return;33 }34
35 }36
37 }38
39
40 public class Consumer implementsRunnable {41
42 private final Brokerbroker;43 private finalString name;44
45 public Consumer(Brokerbroker, String name) {46 this.broker =broker;47 this.name =name;48 }49
50 @Override51 public voidrun() {52 try{53 for (Integer message = broker.take(); message != -1; message =broker.take()) {54 System.out.format("%s consumed: %s%n", name, message);55 Thread.sleep(1000);56 }57 System.out.println("received termination signal");58 } catch(InterruptedException e) {59 e.printStackTrace();60 return;61 }62
63 }64
65 }66
67
68 public classMain {69
70 public static voidmain(String[] args) {71 Broker broker = new WaitNotifyBroker(5);72 //Broker broker = new LockConditionBroker(5);73 //Broker broker = new BlockingQueueBroker();
74
75 new Thread(new Producer(broker, "prod 1")).start();76 new Thread(new Consumer(broker, "cons 1")).start();77 new Thread(new Consumer(broker, "cons 2")).start();78
79 }80
81 }
除了上述的方法,其实还有很多第三方的并发包可以解决这个问题。例如LMAX Disruptor和Chronicle等
本文完。
参考:
《Java 7 Concurrency Cookbook》