700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 树莓派学习笔记(五)——烟雾浓度检测(ONENET云平台收发数据代码)

树莓派学习笔记(五)——烟雾浓度检测(ONENET云平台收发数据代码)

时间:2024-03-11 13:21:11

相关推荐

树莓派学习笔记(五)——烟雾浓度检测(ONENET云平台收发数据代码)

文章目录

前言一、硬件连接MQ-2PCF8591二、Onenet平台数据收发程序onenetsub.pyonenetget.py三、程序树莓派开启iic功能完整程序OneNet界面展示

前言

用树莓派4b做一个烟雾浓度检测仪,烟雾浓度传感器模块MQ-2收集烟雾浓度数据,把数据上传到onenet平台用网页显示。

所需材料:树莓派MQ-2烟雾浓度传感器 ;PCF8591 ad转换模块;杜邦线;面包板;

[.6.11]

一、硬件连接

参考:

MQ-2 :pm2.5、mq-2 、mq2电压浓度转换

PCF8591:PCF8591、PCFPCF8591使用及Python控制

MQ-2、PCF8591和树莓派之间使用I2C总线通信方式,引脚连接如下:

PCF8591 树莓派GND---GNDVCC---3.3VSCL---SCL/GPIO3SDA---SDA/GPIO2MQ-2 树莓派GND---GNDVCC---5V D0----36引脚MQ-2 PCF8591AO----AIN0

MQ-2

MQ-2浓度及电压转换:

​ 烟雾浓度计算公式: ppm = 613.9f * pow(RS/R0, -2.074f)

ppm:为可燃气体的浓度。VRL:电压输出值。Rs:器件在不同气体,不同浓度下的电阻值。R0:器件在洁净空气中的电阻值。RL:负载电阻阻值。注:pow() 方法返回 xy(x 的 y 次方) 的值。import mathmath.pow( x, y )注2:#浓度计算参数CAL_PPM =20# 校准环境中PPM值RL = 5 # RL阻值#R0 = float(6.00) #元件在洁净空气中的阻值Vrl = 5 * ADC_Value / 255 #5V ad 为8位RS = (5 - Vrl) / Vrl * RLR0 = RS / pow(CAL_PPM / 613.9, 1 / -2.074)ppm = 613.9 * pow(RS / R0, -2.074)

PCF8591

#!/usr/bin/env python3# -*- Coding: utf-8 -*-###### pin assign#PCF8591 --------- Raspberry pi3# SDA ------------ GPIO2/SDA1# SCL ------------ GPIO3/SCL1# VCC ------------- 3.3V# GND ------------- GND#######import wiringpiimport timeclass PCF8591:def __init__(self, addr):wiringpi.wiringPiSetup() #setup wiringpiself.i2c = wiringpi.I2C() #get I2Cself.dev = self.i2c.setup(addr) #setup I2C devicedef LED_ON(self):self.i2c.writeReg8(self.dev, 0x40, 0xFF)def LED_OFF(self):self.i2c.writeReg8(self.dev, 0x40, 0x00)def DAoutput(self,value):self.i2c.writeReg8(self.dev, 0x40, value)def analogRead0(self):self.i2c.writeReg8(self.dev, 0x48,0x40)self.i2c.readReg8(self.dev,0x48)#read dummyreturn self.i2c.readReg8(self.dev,0x48)def analogRead1(self):self.i2c.writeReg8(self.dev, 0x48,0x41)self.i2c.readReg8(self.dev,0x48)#read dummyreturn self.i2c.readReg8(self.dev,0x48)def analogRead2(self):self.i2c.writeReg8(self.dev, 0x48,0x42)self.i2c.readReg8(self.dev,0x48)#read dummyreturn self.i2c.readReg8(self.dev,0x48)def analogRead3(self):self.i2c.writeReg8(self.dev, 0x48,0x43)self.i2c.readReg8(self.dev,0x48)#read dummyreturn self.i2c.readReg8(self.dev,0x48)def analogRead(self,pin):self.i2c.writeReg8(self.dev, 0x48,0x40+pin)self.i2c.readReg8(self.dev,0x48)#read dummyreturn self.i2c.readReg8(self.dev,0x48)if __name__ == "__main__":pcf8591 = PCF8591(0x48)while 1:print("QM-2:"+str(pcf8591.analogRead0())) #read the Variable register# print("VCC:"+str(pcf8591.analogRead1())) # read the Variable register# print("END:"+str(pcf8591.analogRead3())) # read the Variable register

二、Onenet平台数据收发程序

参考:onenet_get_put

onenet平台 设置方法见上期:树莓派学习笔记(四)——温湿度检测(本地OLED显示、ONENET云平台显示)

onenetsub.py

将树莓派读取到的传感器数据,通过http协议发送至 平台的对应设备

import urllib.requestimport jsonimport timefrom time import sleep#设备IDdeviceId = "你的设备ID"APIKey = "你的设备APIKEY"#上传函数def http_put_data(data):url = "/devices/" + deviceId + '/datapoints'd = time.strftime('%Y-%m-%dT%H:%M:%S')values = {"datastreams": [ {"id": "CO2", "datapoints": [{"value": data}]},{"id": "PM25", "datapoints": [{"value": data}]},{"id": "PM10", "datapoints": [{"value": data}]},{"id": "VOC", "datapoints": [{"value": data}]} ]}jdata = json.dumps(values).encode("utf-8")request = urllib.request.Request(url, jdata)request.add_header('api-key', APIKey)request.get_method = lambda: 'POST'request = urllib.request.urlopen(request)return request.read()if __name__ == '__main__':R = http_put_data(10)print(R)

onenetget.py

读取平台对应设备的数据流,可以在另一台设备,如PC、树莓派读取onenet平台对应设备的数据流,不过由于平台限制有3s延时

import requestsimport json#设备IDdeviceId = "你的设备ID"APIKey = "你的设备APIKEY"# 基本设置url = "/devices/"+deviceId+"/datastreams"headers = {'api-key': APIKey}# 获得结果并打印r = requests.get(url, headers=headers)t: str = r.text#print(t)params = json.loads(t)#上面这个语句是将我们获得的内容转成数据字典,这样转是因为我们接收到的内容具有数据字典的形式#转换成数据字典利于我们后面的操作#print params['error'][]#print(type(params))#如果执行上面这条语句我们可以看到返回的结果是dict,也就是我们已经成功转换x = params['data']#这个语句是从数据转换后的数据字典中获取我们需要的数据,从结果上看params是一个list#在data前面的都只是一些描述内容,参考教程:/zhiaicq_r/article/details/79278530print('环境参数'+'\t\t\t\t'+'更新时间'+'\t\t\t\t\t'+'数值')#接下来是获取不同的数据流for index,values in enumerate(x):#只需要更新时间,id和值,所以这里对获得的数据字典做一下更改#print(values)#这里得到的values也是一个数据字典#因为在onenet那边对这些数据没有给出来,而且也没有意义,所以我们就不在这里显示,因此现将其删除# del values['unit']# del values['unit_symbol']# del values['create_time']#print(values.items())#print(values['update_at'])#这里不知道为什么直接使用values.get('update_at','')和values.get('current_value','')#或者用values['update_at']和values['current_value']报:KeyError错误,而且if里面的那条语句会执行#所以我们通过get方法解决,其中要注意的是,如果没有给定第二个参数,那么默认输出NONEa= str(values.get('update_at',''))b= str(values.get('current_value',''))#因为如果有更新时间就会有相应最新的值,所以这里只用其中一个作为判断条件if (a != ""):if (values['id'] == 'm_temp' ): #CPU_temp statusprint(str(values['id']) + '\t\t\t' + a + '\t\t\t' + b)T = belif(values['id'] == 'm_hum'):print(str(values['id']) + '\t\t\t\t' + a + '\t\t\t'+ b)H = belif(values['id'] == 'CPU_temp'):print(str(values['id']) + '\t\t\t\t' + a + '\t\t\t' + b)CT = belse:if(values['id'] == 'm_temp' or values['id'] == 'm_hum'):print(values['id']+ '\t\t\t' +'目前还没有收到任何数据')print(T,H,CT)

三、程序

树莓派开启iic功能

远程登陆树莓派后,输入sudo raspi-config后,选择5.Interfacing Options选择P5 I2C选择

完整程序

带ad转换的版本,求出具体烟雾浓度

#! /usr/bin/env python3import RPi.GPIO as GPIO # 导入库,并进行别名的设置import timeimport wiringpiimport psutilimport numpy as np#onenetimport urllibimport urllib.requestimport jsonimport datetimeimport requestsCHANNEL = 36 # 确定引脚口。按照真实的位置确定GPIO.setmode(GPIO.BOARD) # 选择引脚系统,这里我们选择了BOARDGPIO.setup(CHANNEL, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)# 初始化引脚,将36号引脚设置为输入下拉电阻,因为在初始化的时候不确定的的引电平,因此这样设置是用来保证精准,(但是也可以不写“pull_up_down=GPIO.PUD_DOWN”)###### pin assign#PCF8591 --------- Raspberry pi3# SDA ------------ GPIO2/SDA1# SCL ------------ GPIO3/SCL1# VCC ------------- 3.3V# GND ------------- GNDclass PCF8591:def __init__(self, addr):wiringpi.wiringPiSetup() # setup wiringpiself.i2c = wiringpi.I2C() # get I2Cself.dev = self.i2c.setup(addr) # setup I2C devicedef LED_ON(self):self.i2c.writeReg8(self.dev, 0x40, 0xFF)def LED_OFF(self):self.i2c.writeReg8(self.dev, 0x40, 0x00)def DAoutput(self, value):self.i2c.writeReg8(self.dev, 0x40, value)def analogRead0(self) -> object:self.i2c.writeReg8(self.dev, 0x48, 0x40)self.i2c.readReg8(self.dev, 0x48) # read dummyreturn self.i2c.readReg8(self.dev, 0x48)def analogRead1(self):self.i2c.writeReg8(self.dev, 0x48, 0x41)self.i2c.readReg8(self.dev, 0x48) # read dummyreturn self.i2c.readReg8(self.dev, 0x48)def analogRead2(self):self.i2c.writeReg8(self.dev, 0x48, 0x42)self.i2c.readReg8(self.dev, 0x48) # read dummyreturn self.i2c.readReg8(self.dev, 0x48)def analogRead3(self):self.i2c.writeReg8(self.dev, 0x48, 0x43)self.i2c.readReg8(self.dev, 0x48) # read dummyreturn self.i2c.readReg8(self.dev, 0x48)def analogRead(self, pin):self.i2c.writeReg8(self.dev, 0x48, 0x40 + pin)self.i2c.readReg8(self.dev, 0x48) # read dummyreturn self.i2c.readReg8(self.dev, 0x48)# if __name__ == "__main__":#pcf8591 = PCF8591(0x48)#while 1:# print("ADC_Value:" + str(pcf8591.analogRead0()))def getADC_Value():pcf8591 = PCF8591(0x48)AD_V=[]time.sleep(0.3)for i in range(10):v = pcf8591.analogRead0()AD_V.append(v)time.sleep(0.1)ADC_Value = np.sum(AD_V)/10#采样10次,求平均值# print("ADC_Value:" + str(ADC_Value))return ADC_Valuedef SmokeConcentration(ADC_Value):#烟雾浓度计算# 浓度计算参数CAL_PPM = 20 # 校准环境中PPM值RL = 5 # RL阻值R0 = 6.00 #元件在洁净空气中的阻值Vrl = 5 * ADC_Value / 255 #5V ad 为8位RS = (5 - Vrl) / Vrl * RLRunTime = psutil.boot_time()# print("Vrl:"+str(Vrl))# print("RS:" + str(RS))if RunTime >=3:# R0 = RS / pow(CAL_PPM / 613.9, 1 / -2.074)# print("R0:" + str(R0))# print("RS/R0:" + str(RS/R0))ppm = 613.9 * pow(RS / R0, -2.074)return ppmdef getSmokeAlarm(): #烟雾报警,DO引脚控制status = GPIO.input(CHANNEL) # 检测36号引脚口的输入高低电平状态 1为正常#print(status) # 实时打印此时的电平状态return statusdef getPPM():AD_V=getADC_Value()concentration = SmokeConcentration(AD_V)PPM = concentrationPercentage = PPM * pow(10,-6)print("smoke:"+str(PPM))return PPM#上传到onenet#onenetapi_key = '你的密钥' # 密钥device_ID = '你的id' # 设备IDheaders = {'api-key': api_key}url_post = "/devices/" + device_ID + "/datapoints" # 数据点url_get = "/devices/" + device_ID + "/datastreams" # 数据流def http_post_0():data = {'datastreams': [{"id": "status", "datapoints": [{"value": ' 检测到危险气体 ! ! ! '}]}]} # id是你的数据流名称jdata = json.dumps(data).encode("utf-8")r = requests.post(url=url_post, headers=headers, data=jdata)print("发送成功:", r.text)def http_post_1():data = {'datastreams': [{"id": "status", "datapoints": [{"value": ' 空气状况正常\t无有害气体 '}]}]} # id是你的数据流名称jdata = json.dumps(data).encode("utf-8")r = requests.post(url=url_post, headers=headers, data=jdata)print("发送成功:", r.text)def ppm_onenet_put():# 保留两位小数Smoke_ppm = round(getPPM(), 2)Smoke_Alarm = getSmokeAlarm()CurTime = datetime.datetime.now()# 这url只需把数字部分换成你在onenet中创建的设备号就行url = '/devices/602743016/datapoints'values = {'datastreams': [{"id": "Smoke_ppm", "datapoints": [{"time": CurTime.isoformat(), "value": Smoke_ppm}]},{"id": "Smoke_Alarm", "datapoints": [{"time": CurTime.isoformat(), "value": Smoke_Alarm}]}]}jdata = json.dumps(values).encode("utf-8") # 对数据进行JSON格式化编码 ##然后# 打印json内容print(jdata)##python2:request = urllib.request.Request(url, jdata)request.add_header('api-key', api_key)request.get_method = lambda: 'POST' # 设置HTTP的访问方式request = urllib.request.urlopen(request)return request.read().decode("utf-8")def mq2_local():time.sleep(3)Smoke_ppm = round(getPPM(), 2)Smoke_Alarm = getSmokeAlarm()if Smoke_Alarm == True: # 如果为高电平,说明MQ-2正常,并打印“OK”print(' 正常 ')else: # 如果为低电平,说明MQ-2检测到有害气体,并打印“dangerous”print(' 检测到危险气体 ! ! ! ')print(Smoke_Alarm) # 实时打印此时的电平状态print("Smoke_ppm=%0.2f" % Smoke_ppm)def mq2_onenet_put(): #上传 Smoke_ppm status Smoke_Alarmtime.sleep(3)Smoke_Alarm = getSmokeAlarm()resp = ppm_onenet_put()print("OneNET_result: %s" % resp)if Smoke_Alarm == True:http_post_1()else:http_post_0()if __name__ == '__main__':while (1):mq2_local()mq2_onenet_put()time.sleep(1)GPIO.cleanup() # 清理运行完成后的残余

不带ad转换的程序,没有PCF8591的时候用,输出 是否有 危险气体的开关量

#! /usr/bin/env python3import RPi.GPIO as GPIO # 导入库,并进行别名的设置import time#onenetimport requestsimport json#mq2CHANNEL = 36 # 确定引脚口。按照真实的位置确定GPIO.setmode(GPIO.BOARD) # 选择引脚系统,这里我们选择了BOARDGPIO.setup(CHANNEL, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)# 初始化引脚,将36号引脚设置为输入下拉电阻,因为在初始化的时候不确定的的引电平,因此这样设置是用来保证精准,(但是也可以不写“pull_up_down=GPIO.PUD_DOWN”)#onenetapi_key = '密钥' # 密钥device_ID = '设备ID' # 设备IDheaders = {'api-key': api_key}url_post = "/devices/" + device_ID + "/datapoints" # 数据点url_get = "/devices/" + device_ID + "/datastreams" # 数据流def http_post_0():data = {'datastreams': [{"id": "status", "datapoints": [{"value": ' 检测到危险气体 ! ! ! '}]}]} # id是你的数据流名称jdata = json.dumps(data).encode("utf-8")r = requests.post(url=url_post, headers=headers, data=jdata)#print("发送成功:", r.text)def http_post_1():data = {'datastreams': [{"id": "status", "datapoints": [{"value": ' 空气状况正常\t无有害气体 '}]}]} # id是你的数据流名称jdata = json.dumps(data).encode("utf-8")r = requests.post(url=url_post, headers=headers, data=jdata)#print("发送成功:", r.text)# 带有异常处理的主程序try:while True: # 执行一个while死循环status = GPIO.input(CHANNEL) # 检测36号引脚口的输入高低电平状态# print(status) # 实时打印此时的电平状态if status == True: # 如果为高电平,说明MQ-2正常,并打印“OK”print(' 空气状况正常 ')http_post_1()else: # 如果为低电平,说明MQ-2检测到有害气体,并打印“dangerous”print(' 检测到危险气体 ! ! ! ')http_post_0()time.sleep(0.1) # 睡眠0.1秒,以后再执行while循环except KeyboardInterrupt: # 异常处理,当检测按下键盘的Ctrl+C,就会退出这个>脚本GPIO.cleanup() # 清理运行完成后的残余

OneNet界面展示

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。