700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 【JAVA多线程】如何解决一个生产者与消费者问题

【JAVA多线程】如何解决一个生产者与消费者问题

时间:2021-06-26 13:33:57

相关推荐

【JAVA多线程】如何解决一个生产者与消费者问题

如何解决一个生产者与消费者问题

生产者与消费者问题是多线程同步的一个经典问题。生产者和消费者同时使用一块缓冲区,生产者生产商品放入缓冲区,消费者从缓冲区中取出商品。我们需要保证的是,当缓冲区满时,生产者不可生产商品;当缓冲区为空时,消费者不可取出商品。

下面介绍java中几种解决同步问题的方式

(1)wait()与notify()方法

(2)Lock与Condition机制

(3)BlockingQueue阻塞队列

【1】wait()与notify()方法

这两个方法是object类中的方法

wait()用在以下场合:

(1)当缓冲区满时,缓冲区调用wait()方法,使得生产者释放锁,当前线程阻塞,其他线程可以获得锁。

(2)当缓冲区空时,缓冲区调用wait()方法,使得消费者释放锁,当前线程阻塞,其他线程可以获得锁。

notify()用在以下场合:

(1)当缓冲区未满时,生产者生产商品放入缓冲区,然后缓冲区调用notify()方法,通知上一个因wait()方法释放锁的线程现在可以去获得锁了,同步块代码执行完成后,释放对象锁,此处的对象锁,锁住的是缓冲区。

(2)当缓冲区不为空时,消费者从缓冲区中取出商品,然后缓冲区调用notify()方法,通知上一个因wait()方法释放锁的线程现在可以去获得锁了,同步块代码执行完成后,释放对象锁。

代码演示:

package day1101;import java.util.LinkedList;/*** 生产者消费者问题*/public class ProAndCon {//最大容量public static final int MAX_SIZE = 2;//存储媒介public static LinkedList<Integer> list = new LinkedList<>();class Producer implements Runnable {@Overridepublic void run() {synchronized (list) {//仓库容量已经达到最大值while (list.size() == MAX_SIZE) {System.out.println("仓库已满,生产者" + Thread.currentThread().getName() + "不可生产.");try {list.wait();} catch (InterruptedException e) {e.printStackTrace();}}list.add(1);System.out.println("生产者" + Thread.currentThread().getName() + "生产, 仓库容量为" + list.size());list.notify();}}}class Consumer implements Runnable {@Overridepublic void run() {synchronized (list) {while (list.size() == 0) {System.out.println("仓库为空,消费者" + Thread.currentThread().getName() + "不可消费.");try {list.wait();} catch (InterruptedException e) {e.printStackTrace();}}list.removeFirst();System.out.println("消费者" + Thread.currentThread().getName() + "消费,仓库容量为" + list.size());list.notify();}}}public static void main(String[] args) {ProAndCon proAndCon = new ProAndCon();Producer producer = proAndCon.new Producer();Consumer consumer = proAndCon.new Consumer();for (int i = 0; i < 10; i++) {Thread pro = new Thread(producer);pro.start();Thread con = new Thread(consumer);con.start();}}}

运行结果:

【2】Lock与Condition机制

在JDK5.0之后,Java提供了Lock与Condition机制。Condition接口的await()和signal()是用来做同步的两种方法,它们的功能基本上和Object的wait()、nofity()相同,或者说可以取代它们,但是它们和Lock机制是直接挂钩的。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

代码演示:

package day1101;import java.util.LinkedList;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ProAndCon2 {public static final int MAX_SIZE = 2;public static LinkedList<Integer> list = new LinkedList<>();public static Lock lock = new ReentrantLock();//仓库满的条件变量public static Condition full = lock.newCondition();//仓库空的条件变量public static Condition empty = lock.newCondition();class Producer implements Runnable {@Overridepublic void run() {lock.lock();while (list.size() == MAX_SIZE) {try {System.out.println("仓库已满,生产者" + Thread.currentThread().getName() + "不可生产.");full.await();} catch (InterruptedException e) {e.printStackTrace();}}list.add(1);System.out.println("生产者" + Thread.currentThread().getName() + "生产, 仓库容量为" + list.size());//唤醒其他生产者与消费者线程full.signal();empty.signal();lock.unlock();}}class Consumer implements Runnable {@Overridepublic void run() {lock.lock();while (list.size() == 0) {try {System.out.println("仓库为空,消费者" + Thread.currentThread().getName() + "不可消费.");empty.await();} catch (InterruptedException e) {e.printStackTrace();}}list.removeFirst();System.out.println("消费者" + Thread.currentThread().getName() + "消费,仓库容量为" + list.size());//唤醒其他生产者与消费者线程full.signal();empty.signal();lock.unlock();}}public static void main(String[] args) {ProAndCon2 proAndCon = new ProAndCon2();Producer producer = proAndCon.new Producer();Consumer consumer = proAndCon.new Consumer();for (int i = 0; i < 10; i++) {Thread pro = new Thread(producer);pro.start();Thread con = new Thread(consumer);con.start();}}}

运行结果:

【3】使用BlockingQueue阻塞队列

什么是阻塞队列?

如果向一个已经满了的队列中添加元素或者从空队列中移除元素,都将会导致线程阻塞,线程一直等待到有旧元素被移除或新元素被添加的时候,才能继续执行。符合这种情况的队列,称为阻塞队列。

JDK 1.5 以后新增BlockingQueue接口,我们采用它实现类的其中两个类,ArrayBlockingQueue或者是LinkedBlockingQueue。

怎么使用LinkedBlockingQueue?

这里我们用LinkedBlockingQueue来解决生产者与消费者问题,主要用到它的两个方法,即put()与take()

put():向阻塞队列中添加一个元素,队列满时,自动阻塞。

take():从阻塞队列中取出一个元素,队列空时,自动阻塞。

其实LinkedBlockingQueue底层使用的仍然是Lock与Condition机制,我们从源码就可以看出来

//..............用到了Lock与Condition机制/** Lock held by take, poll, etc */private final ReentrantLock takeLock = new ReentrantLock();/** Wait queue for waiting takes */private final Condition notEmpty = takeLock.newCondition();/** Lock held by put, offer, etc */private final ReentrantLock putLock = new ReentrantLock();/** Wait queue for waiting puts */private final Condition notFull = putLock.newCondition();//...........put方法/*** Inserts the specified element at the tail of this queue, waiting if* necessary for space to become available.** @throws InterruptedException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*/while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();}//...........take方法public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}

看得出来,LinkedBlockingQueue底层已经解决好了同步问题,我们可以很方便的使用它。

代码演示:

package day1024;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;/*** 解决生产者与消费者问题* 采用阻塞队列BlockingQueue*/public class ProAndCon3 {public static final int MAX_SIZE = 2;public static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(MAX_SIZE);class Producer implements Runnable {@Overridepublic void run() {if (queue.size() == MAX_SIZE) {System.out.println("仓库已满,生产者" + Thread.currentThread().getName() + "不可生产.");}try {queue.put(1);System.out.println("生产者" + Thread.currentThread().getName() + "生产, 仓库容量为" + queue.size());} catch (InterruptedException e) {e.printStackTrace();}}}class Consumer implements Runnable {@Overridepublic void run() {if (queue.size() == 0) {System.out.println("仓库为空,消费者" + Thread.currentThread().getName() + "不可消费.");}try {queue.take();System.out.println("消费者" + Thread.currentThread().getName() + "消费,仓库容量为" + queue.size());} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {ProAndCon3 proAndCon = new ProAndCon3();Producer producer = proAndCon.new Producer();Consumer consumer = proAndCon.new Consumer();for (int i = 0; i < 10; i++) {Thread pro = new Thread(producer);pro.start();Thread con = new Thread(consumer);con.start();}}}

运行结果就不贴了。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。