700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > Java多线程编程——生产者消费者问题

Java多线程编程——生产者消费者问题

时间:2022-01-05 12:49:54

相关推荐

Java多线程编程——生产者消费者问题

一、问题介绍

生产者消费者问题是一个经典的多线程同步问题。该问题描述了两个进程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是不断的生成数据,而与此同时,消费者则不断消耗这些数据。该问题的关键就是要保证当生产者生产了产品后,若消费者还没有消费此产品,则生产者停止生产并等待,直到消费者消费了此产品;当消费者消费了产品后,若生产者还没有及时生产新的产品,则消费者停止消费并等待,直到生产者生产了新产品。本文为了简单起见,只介绍单生产者—单消费者模式。

二、解决方案一:管程法

所谓“管程法”,就是在生产者和消费者间建立一个共享缓冲区。生产者只负责生产产品,往缓冲区内存放产品,若缓冲区已满则停止生产,等待消费者消费;消费者只负责消费产品,从缓冲区中取出产品,若缓冲区为空则停止消费,等待生产者生产。生产者和消费者间的协作通信都是由缓冲区帮忙建立的。

下面是使用Java实现的代码:

public class Main {public static void main(String[] args) {// 创建一个缓冲区Container container = new Container(5);// 开启生产者线程new Producer(container).start();// 开启消费者线程new Consumer(container).start();}}// 产品class Product {// 产品名称private final String name;public Product(String name) {this.name = name;}public String getName() {return name;}}// 缓冲区class Container {// 缓冲区数组private final Product[] buffer;// 缓冲区最大容量private final int capacity;// 缓冲区内已放入产品的数量private int size;public Container(int capacity) {buffer = new Product[capacity];this.capacity = capacity;this.size = 0;}// 向缓冲区内存入一件产品// 修饰为同步方法,保证同一时刻只有一个线程可以向缓冲区内存入产品public synchronized void put(Product product) throws InterruptedException {// 若缓冲区已满if (size >= capacity) {// 暂缓放入产品,进行等待,直到缓冲区内出现空余this.wait();}// 现在缓冲区内发现了空余位置// 向缓冲区内存入这件产品buffer[size++] = product;System.out.println("Producer: Put " + product.getName());// 告知其它线程缓冲区内存入产品已完成this.notifyAll();}// 从缓冲区中拿出一件产品// 修饰为同步方法,保证同一时刻只有一个线程可以从缓冲区中拿出产品public synchronized Product get() throws InterruptedException {// 若缓冲区内没有产品if (size <= 0) {// 暂缓取出产品,进行等待,直到缓冲区中出现产品this.wait();}// 现在缓冲区内有产品了// 从缓冲区内取出一件产品Product product = buffer[--size];System.out.println("Consumer: Get " + product.getName());// 告知其它线程从缓冲区中取出产品已完成this.notifyAll();// 将取出的产品返回return product;}}// 生产者class Producer extends Thread {private final Container container;public Producer(Container container) {this.container = container;}@Overridepublic void run() {// 生产者只负责生产try {produce();} catch (InterruptedException e) {e.printStackTrace();}}private void produce() throws InterruptedException {for (int i = 1; i <= 50; i++) {container.put(new Product("Product " + i));}}}// 消费者class Consumer extends Thread {private final Container container;public Consumer(Container container) {this.container = container;}@Overridepublic void run() {// 消费者只负责消费try {consume();} catch (InterruptedException e) {e.printStackTrace();}}private void consume() throws InterruptedException {for (int i = 1; i <= 50; i++) {Product product = container.get();}}}

我们先看缓冲区Container类中的put方法,即向缓冲区内存入产品。首先,程序检查缓冲区是否已满,若确实已满则调用wait方法进行等待,等待消费者进行消费(即从缓冲区内取出产品),同时该线程释放锁,让其他线程使用Container类对象。若消费者完成了消费(即get方法中调用了notifyAll方法),则wait方法结束等待,开始执行buffer[size++] = product语句向缓冲区内存入产品,然后调用notifyAll方法通知消费者进行消费。

接着再来看get方法,即从缓冲区中取出产品。首先,程序检查缓冲区是否为空,若确实为空则调用wait方法进行等待,等待生产者进行生产(即向缓冲区中存放产品),同时该线程释放锁,让其他线程使用Container类对象。若生产者完成了生产(即put方法中调用了notifyAll方法),则wait方法结束等待,开始执行Product product = buffer[--size]语句从缓冲区中取出产品,然后调用notifyAll方法通知生产者进行生产。

程序的运行结果如下:

Producer: Put Product 1Producer: Put Product 2Producer: Put Product 3Producer: Put Product 4Producer: Put Product 5Consumer: Get Product 5Consumer: Get Product 4Consumer: Get Product 3Consumer: Get Product 2Consumer: Get Product 1Producer: Put Product 6Consumer: Get Product 6Producer: Put Product 7Consumer: Get Product 7......Producer: Put Product 22Producer: Put Product 23Producer: Put Product 24Producer: Put Product 25Consumer: Get Product 25Consumer: Get Product 24Consumer: Get Product 23Consumer: Get Product 22Producer: Put Product 26Consumer: Get Product 26......

可以看到,总是生产者生产了一定数量的产品后,消费者才进行消费。不存在生产者生产出超过缓冲区大小数量的产品,而消费者还没有执行消费;也不存在消费者消费了超过缓冲区中存在数量的产品,而生产者还没有进行生产。

三、解决方案二:信号灯法

所谓“信号灯法”,就是使用一个标志位来控制生产者和消费者间的通信。在本例中,当标志位为假,则让生产者生产一件产品,消费者进行等待;当标志位为真时,则轮到消费者消费一件产品,生产者进行等待。生产者和消费者间轮替着执行上述过程,这样肯定能够保证先生产了产品再进行消费、完成了消费后才去生产产品。

使用Java编程的实现如下:

public class Main {public static void main(String[] args) {Factory factory = new Factory();new Producer(factory).start();new Consumer(factory).start();}}class Producer extends Thread {private final Factory factory;public Producer(Factory factory) {this.factory = factory;}@Overridepublic void run() {for (int i = 1; i <= 10; i++) {try {factory.produce("Product " + i);Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}}}}class Consumer extends Thread {private final Factory factory;public Consumer(Factory factory) {this.factory = factory;}@Overridepublic void run() {for (int i = 1; i <= 10; i++) {try {factory.consume();Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}}}class Factory {private String productName;private boolean hasProduct;public Factory() {// 开始时不存在产品this.hasProduct = false;}// 生产部门组织进行生产public synchronized void produce(String productName) throws InterruptedException {// 如果已经生产了一件产品,标志位为真if (hasProduct) {// 让生产者进行等待,直到消费者消费了这件产品this.wait();}// 生产者生产出一件产品System.out.println("Producer: produce " + productName);this.productName = productName;// 将标志位设为真,表示当前生产出了一件产品hasProduct = true;// 通知消费者快去消费this.notifyAll();}// 消费部门组织进行消费public synchronized void consume() throws InterruptedException {// 如果当前没有生产出的产品if (!hasProduct) {// 让消费者进行等待,直到生产者生产出一件产品this.wait();}// 消费者消费掉一件产品System.out.println("Consumer: consume " + productName);this.productName = null;// 将标志位设为假,表示当前消费掉了一件产品hasProduct = false;// 通知生产者快去生产this.notifyAll();}}

程序的执行结果如下:

Producer: produce Product 1Consumer: consume Product 1Producer: produce Product 2Consumer: consume Product 2Producer: produce Product 3Consumer: consume Product 3Producer: produce Product 4Consumer: consume Product 4Producer: produce Product 5Consumer: consume Product 5Producer: produce Product 6Consumer: consume Product 6Producer: produce Product 7Consumer: consume Product 7Producer: produce Product 8Consumer: consume Product 8Producer: produce Product 9Consumer: consume Product 9Producer: produce Product 10Consumer: consume Product 10

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。