700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > pattern in java_Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

pattern in java_Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

时间:2020-08-01 19:50:50

相关推荐

pattern in java_Java里的生产者-消费者模型(Producer and Consumer Pattern in Java)

生产者-消费者模型是多线程问题里面的经典问题,也是面试的常见问题。有如下几个常见的实现方法:

1. wait()/notify()

2. lock & condition

3. BlockingQueue

下面来逐一分析。

1. wait()/notify()

第一种实现,利用根类Object的两个方法wait()/notify(),来停止或者唤醒线程的执行;这也是最原始的实现。

1 public class WaitNotifyBroker implements Broker{2

3 private finalObject[] items;4

5 private inttakeIndex;6 private intputIndex;7 private intcount;8

9 public WaitNotifyBroker(intcapacity) {10 this.items = newObject[capacity];11 }12

13 @SuppressWarnings("unchecked")14 @Override15 publicT take() {16 T tmpObj = null;17 try{18 synchronized(items) {19 while (0 ==count) {20 items.wait();21 }22 tmpObj =(T) items[takeIndex];23 if (++takeIndex ==items.length) {24 takeIndex = 0;25 }26 count--;27 items.notify();28 }29 } catch(InterruptedException e) {30 e.printStackTrace();31 }32

33 returntmpObj;34 }35

36 @Override37 public voidput(T obj) {38 try{39 synchronized(items) {40 while (items.length ==count) {41 items.wait();42 }43

44 items[putIndex] =obj;45 if (++putIndex ==items.length) {46 putIndex = 0;47 }48 count++;49 items.notify();50 }51 } catch(InterruptedException e) {52 e.printStackTrace();53 }54

55 }56

57 }

这里利用Array构造一个Buffer去存取数据,并利用count, putIndex和takeIndex来保证First-In-First-Out。

如果利用LinkedList来代替Array,相对来说会稍微简单些。

LinkedList的实现,可以参考《Java 7 Concurrency Cookbook》第2章wait/notify。

2.lock & condition

lock & condition,实际上也实现了类似synchronized和wait()/notify()的功能,但在加锁和解锁、暂停和唤醒方面,更加细腻和可控。

在JDK的BlockingQueue的默认实现里,也是利用了lock & condition。此文也详细介绍了怎么利用lock&condition写BlockingQueue,这里换LinkedList再实现一次:

1 public class LockConditionBroker implements Broker{2

3 private finalReentrantLock lock;4 private finalCondition notFull;5 private finalCondition notEmpty;6 private final intcapacity;7 private LinkedListitems;8

9 public LockConditionBroker(intcapacity) {10 this.lock = newReentrantLock();11 this.notFull =lock.newCondition();12 this.notEmpty =lock.newCondition();13 this.capacity =capacity;14

15 items = new LinkedList();16 }17

18 @Override19 publicT take() {20 T tmpObj = null;21 lock.lock();22 try{23 while (items.size() == 0) {24 notEmpty.await();25 }26

27 tmpObj =items.poll();28 notFull.signalAll();29

30 } catch(InterruptedException e) {31 e.printStackTrace();32 } finally{33 lock.unlock();34 }35 returntmpObj;36 }37

38 @Override39 public voidput(T obj) {40 lock.lock();41 try{42 while (items.size() ==capacity) {43 notFull.await();44 }45

46 items.offer(obj);47 notEmpty.signalAll();48

49 } catch(InterruptedException e) {50 e.printStackTrace();51 } finally{52 lock.unlock();53 }54

55 }56 }

3.BlockingQueue

最后这种方法,也是最简单最值得推荐的。利用并发包提供的工具:阻塞队列,将阻塞的逻辑交给BlockingQueue。

实际上,上述1和2的方法实现的Broker类,也可以视为一种简单的阻塞队列,不过没有标准包那么完善。

1 public class BlockingQueueBroker implements Broker{2

3 private final BlockingQueuequeue;4

5 publicBlockingQueueBroker() {6 this.queue = new LinkedBlockingQueue();7 }8

9 @Override10 publicT take() {11 try{12 returnqueue.take();13 } catch(InterruptedException e) {14 e.printStackTrace();15 }16

17 return null;18 }19

20 @Override21 public voidput(T obj) {22 try{23 queue.put(obj);24 } catch(InterruptedException e) {25 e.printStackTrace();26 }27 }28

29 }

我们的队列封装了标注包里的LinkedBlockingQueue,十分简单高效。

接下来,就是一个1P2C的例子:

1 public interface Broker{2

3 T take();4

5 voidput(T obj);6

7 }8

9

10 public class Producer implementsRunnable {11

12 private final Brokerbroker;13 private finalString name;14

15 public Producer(Brokerbroker, String name) {16 this.broker =broker;17 this.name =name;18 }19

20 @Override21 public voidrun() {22 try{23 for (int i = 0; i < 5; i++) {24 broker.put(i);25 System.out.format("%s produced: %s%n", name, i);26 Thread.sleep(1000);27 }28 broker.put(-1);29 System.out.println("produced termination signal");30 } catch(InterruptedException e) {31 e.printStackTrace();32 return;33 }34

35 }36

37 }38

39

40 public class Consumer implementsRunnable {41

42 private final Brokerbroker;43 private finalString name;44

45 public Consumer(Brokerbroker, String name) {46 this.broker =broker;47 this.name =name;48 }49

50 @Override51 public voidrun() {52 try{53 for (Integer message = broker.take(); message != -1; message =broker.take()) {54 System.out.format("%s consumed: %s%n", name, message);55 Thread.sleep(1000);56 }57 System.out.println("received termination signal");58 } catch(InterruptedException e) {59 e.printStackTrace();60 return;61 }62

63 }64

65 }66

67

68 public classMain {69

70 public static voidmain(String[] args) {71 Broker broker = new WaitNotifyBroker(5);72 //Broker broker = new LockConditionBroker(5);73 //Broker broker = new BlockingQueueBroker();

74

75 new Thread(new Producer(broker, "prod 1")).start();76 new Thread(new Consumer(broker, "cons 1")).start();77 new Thread(new Consumer(broker, "cons 2")).start();78

79 }80

81 }

除了上述的方法,其实还有很多第三方的并发包可以解决这个问题。例如LMAX Disruptor和Chronicle等

本文完。

参考:

《Java 7 Concurrency Cookbook》

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。