700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > java消费者生产者设计模式_java 多线程并发设计模式之四: 生产者消费者模式

java消费者生产者设计模式_java 多线程并发设计模式之四: 生产者消费者模式

时间:2019-06-25 18:57:19

相关推荐

java消费者生产者设计模式_java 多线程并发设计模式之四: 生产者消费者模式

生产者消费者模式是一个经典的多线程设计模式,其核心思想是:有两类线程和一个内存缓冲区或者队列, 一类线程发起任务,并提交到队列中。另一类线程用来处理这些任务,叫做消费者线程. 这两类线程进行通信的桥梁是内存缓冲区,从而实现了解耦,生产者不知道消费者的存在,消费者也不知道生产者的存在. 二者的处理速度无论快慢,都可以通过内存缓冲区得到协调.

在下面的例子中,用 BlockingQueue 作为内存缓冲区。 PCData 类作为要处理的任务。

生产者线程package com.yihaomen.produceconsume;

import java.util.Random;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {

private volatile boolean isRunning = true;

private BlockingQueuequeue;

private static AtomicInteger count = new AtomicInteger();

private static final int SLEEPTIME = 1000;

public Producer(BlockingQueuequeue) {

this.queue = queue;

}

public void run() {

PCData data = null;

Random r = new Random();

System.out.println("start producer id="+Thread.currentThread().getId());

try {

while (isRunning) {

Thread.sleep(r.nextInt(SLEEPTIME));

data = new PCData(count.incrementAndGet());

System.out.println(data+" is put into queue");

if (!queue.offer(data, 2, TimeUnit.SECONDS)) {

System.err.println("failed to put data��" + data);

}

}

} catch (InterruptedException e) {

e.printStackTrace();

Thread.currentThread().interrupt();

}

}

public void stop() {

isRunning = false;

}

}

消费者线程

package com.yihaomen.produceconsume;

import java.text.MessageFormat;

import java.util.Random;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.TimeUnit;

public class Consumer implements Runnable {

private BlockingQueuequeue;

private static final int SLEEPTIME = 1000;

public Consumer(BlockingQueuequeue) {

this.queue = queue;

}

public void run() {

System.out.println("start Consumer id="

+ Thread.currentThread().getId());

Random r = new Random();

try {

while(true){

PCData data = queue.take();

if (null != data) {

int re = data.getData() * data.getData();

System.out.println(MessageFormat.format("{0}*{1}={2}",

data.getData(), data.getData(), re));

Thread.sleep(r.nextInt(SLEEPTIME));

}

}

} catch (InterruptedException e) {

e.printStackTrace();

Thread.currentThread().interrupt();

}

}

}

要处理的任务载体

package com.yihaomen.produceconsume;

public final class PCData {

private final int intData;

public PCData(int d){

intData=d;

}

public PCData(String d){

intData=Integer.valueOf(d);

}

public int getData(){

return intData;

}

@Override

public String toString(){

return "data:"+intData;

}

}

测试生产者,消费者 主程序

package com.yihaomen.produceconsume;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.LinkedBlockingQueue;

public class Main {

public static void main(String[] args) throws InterruptedException {

BlockingQueuequeue = new LinkedBlockingQueue(10);

//建立生产者

Producer producer1 = new Producer(queue);

Producer producer2 = new Producer(queue);

Producer producer3 = new Producer(queue);

//建立消费者

Consumer consumer1 = new Consumer(queue);

Consumer consumer2 = new Consumer(queue);

Consumer consumer3 = new Consumer(queue);

//线程池

ExecutorService service = Executors.newCachedThreadPool();

service.execute(producer1);

service.execute(producer2);

service.execute(producer3);

service.execute(consumer1);

service.execute(consumer2);

service.execute(consumer3);

Thread.sleep(10 * 1000);

//停止生产者

producer1.stop();

producer2.stop();

producer3.stop();

Thread.sleep(3000);

service.shutdown();

}

}

所有源代码提供下载:

java thread procedure consume pattern sample download

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。