目录
🍉 阻塞队列🍉阻塞队列的功能🍉生产者消费者模型🍉生产者消费者模型的优点🥝阻塞队列的实现🥝 Java中标准库的阻塞队列🥝自己实现一个阻塞队列🍉 阻塞队列
在学习数据结构时 我们了解到了什么是队列,普通队列遵守先进先出的规则,阻塞队列同样也符合这样的规则,但是相比于普通队列阻塞队列还有着其他的功能
🍉阻塞队列的功能
线程安全:阻塞队列是我们多线程中的案列之一,既然属于多线程,那么一定和线程安全有关系,阻塞队列主要的功能就是能保证线程安全产生阻塞效果1.若队列为空,尝试出队列,就会出现阻塞,阻塞到队列不为空为止
2 .若队列为满,尝试如队列也会出现堵塞,阻塞到队列不为满为止.
基于上述问题,我们就可以实现一个典型的生产者和消费者模型
🍉生产者消费者模型
我们举个场景🌰 :
过年的时候,大家家里都会包饺子,一家三口都会齐上阵,为了高效率的包,一般都会分工,假如👨擀饺子皮,👦和👩包饺子,这样效率会大大提高,👨擀完饺子皮之后会把饺子皮放在盖帘上,然后👦和👩将盖帘子上的饺子皮包成饺子,这就是一个典型的生产者消费者模型
生产者:👨
消费者:👦和👩
这里的盖帘子我们把他叫做交易场所
盖帘子上饺子皮太多,会导致生产者阻塞,盖帘子没有更多的饺子皮,会导致消费者堵塞
生产者和消费者模型是实际开发中非常有用的一种多线程开发手段,尤其是在服务器开发中
假设有两个服务器A,B A作为入口服务器作为直接接受用户的网络请求,B作为应用服务器,来给A提供一些数据
🍉生产者消费者模型的优点
优点一:能够让多个服务器之间充分的解耦合何为解耦合?
打个比方:如果不使用生产者消费者模型,就像上面的图所示,这样的话A,B两个服务器耦合性是非常高的,在开发A是,需要充分的了解到B的接口是什么样的,开发B的时候也需要知道A是怎么调用的,一旦把B换成C,A是需要进行非常大的改动的,而且如果B挂了A也顺带着挂了,显然在计算机中这样的代码是非常不可取的,耦合性太高,在日常开发中我也需要尽量做到高内聚,低耦合
如果我们引进生产者消费者模型,就解决了这个问题
如图
当我们引进了阻塞队列后,就会产生两个生产者消费者模型
1. 对于请求:A是生产者,B是消费者
2.对于响应:A是消费者,B是生产者
阻塞队列就是A和B的交易场所
A和B分别和阻塞队列进行交互,A不需要认识B,B也不需要认识A,队列是不会变的 B挂了,A也没啥影响,把B换成C,A也完全感知不到,这就达到了我们所说的低耦合的理念
优点二:能够对请求进行削峰填谷
如图,A作为入口服务器,计算量很轻,即使请求暴涨,A服务器也不会奔溃,但是B作为响应服务器,计算量很大,所需要的资源也很多,如果请求量暴涨,B就会挂掉
但是如果引进阻塞队列的话,就会有所改变
引进阻塞队列后就会起到削峰填谷的效果
削峰
填谷加入阻塞队列后,A将所有的请求放进了阻塞队列里,阻塞队列没有什么计算量,只是存储数据,所以几乎能容纳所有的A传过来的请求,B这边不会一下子增长很多请求,他会按照原来的速度处理数据,B就会得到很好的保护,不会因为请求的数据暴涨而崩溃,这就是削峰
像这种数据突然增长许多的情况只会维持很多的一段时间,等峰值过去后,数据增长又恢复了,B服务器不会因为请求的减少而闲下来,仍然会按照原有的速度通过阻塞队列来处理数据,这就叫做填谷
以上就是削峰填谷,其实这样的功能主要是为了保护像B这样的响应/应用服务器,保证程序不会因为数据的暴涨而崩溃
在现实生活中也有这样的🌰 :三峡大坝,不懂得小伙伴可以去了解一下
🥝阻塞队列的实现
🥝 Java中标准库的阻塞队列
我们先根据Java中内置的阻塞队列,基于他来实现一个生产者消费者模型,再来实现自己创建的阻塞队列
public static void main(String[] args) throws InterruptedException {BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();blockingQueue.put("hello");blockingQueue.take();}
这就是一个简单的阻塞队列
🥝自己实现一个阻塞队列
先实现一个简单的队列队列可以由数组和链表组成,数组实现起来呢更加简单,所以我们用数组实现,并且使用循环数组
1.设置三个变量分别的链表的头(head),尾(end),还有链表的长度(size);
2.入队列,把元素放在end的位置上,并且end++,size++;,如果end的值等于数组的长度,end设置为0;
3.出队列,head++,size–;如果head的值等于数组的长度head设置为0;
判断队列是否为空或者满,就需要判断size的值如果size=0,链表为空,在从队列取出元素时,产生堵塞,如果size==数组的长度,那么在放进元素时,也会产生堵塞
下面是代码的实现
class MyBlockingQueue{private int[] array = new int[1_0000];private int head = 0;private int end = 0;private int size = 0;public void put(int value) throws InterruptedException {if(size == array.length){return;}array[end] = value;end++;//需要处理end下表到达数组末尾的情况if(end >= array.length){end = 0;}size++;}public Integer take() throws InterruptedException {if(size == 0){return null;}int ret = array[head];head++;if(head>=array.length){head=0;}size--;return ret;}}
保证阻塞队列的安全
从上面的代码我可以看出,take与put方法每一行代码都在对公共变量进行操作,所以我们直接对这两个方法进行加锁即可
class MyBlockingQueue{private int[] array = new int[1_0000];private int head = 0;private int end = 0;private int size = 0;synchronized public void put(int value) throws InterruptedException {if(size == array.length){}array[end] = value;end++;//需要处理end下表到达数组末尾的情况if(end >= array.length){end = 0;}size++;}synchronized public Integer take() throws InterruptedException {if(size == 0){}int ret = array[head];head++;if(head>=array.length){head=0;}size--;return ret;}}
实现堵塞效果
出队列,head++,size–;如果head的值等于数组的长度head设置为0;
判断队列是否为空或者满,就需要判断size的值如果size=0,链表为空,在从队列取出元素时,产生堵塞,如果size数组的长度,那么在放进元素时,也会产生堵塞
关键使用wait和notify关键字
class MyBlockingQueue{private int[] array = new int[1_0000];private int head = 0;private int end = 0;private int size = 0;synchronized public void put(int value) throws InterruptedException {if(size == array.length){this.wait();}array[end] = value;end++;//需要处理end下表到达数组末尾的情况if(end >= array.length){end = 0;}size++;this.notify();}synchronized public Integer take() throws InterruptedException {if(size == 0){this.wait();}int ret = array[head];head++;if(head>=array.length){head=0;}size--;this.notify();return ret;}}
这样的一个简单的堵塞队列就完成了
‘下面创建一个实例来感受一下这样的阻塞队列
🌰:`
public class TestDemo3 {private static MyBlockingQueue myBlockingQueue = new MyBlockingQueue();public static void main1(String[] args) throws InterruptedException {BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();blockingQueue.put("hello");blockingQueue.take();}public static void main(String[] args) {Thread producer = new Thread(()->{int num = 0;while(true){try {System.out.println("生产了"+num);myBlockingQueue.put(num);num++;//Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});producer.start();Thread customer = new Thread(()->{while(true){try {int num = myBlockingQueue.take();System.out.println("消费了"+num);Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}});customer.start();}}
运行结果
以上就是阻塞队列的相关基础知识
👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋
👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋
👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋👋