700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > PyQt5之QtBluetooth模块:低功耗蓝牙BLE通信

PyQt5之QtBluetooth模块:低功耗蓝牙BLE通信

时间:2021-07-01 16:22:29

相关推荐

PyQt5之QtBluetooth模块:低功耗蓝牙BLE通信

PyQt5之QtBluetooth模块:低功耗蓝牙BLE通信

最近使用PyQt5开发PC端工具,正巧手上有一个富芮坤的低功耗蓝牙,于是想在PC端试试与之通信,不过发现使用PyQt5开发低功耗蓝牙的教程很少,本人参考Qt教程和官方文档,开发过程以此文记录。

参考文档:PyQt5模块安装路径下QtBluetooth.py、Qt C++蓝牙类介绍、Qt 低功耗蓝牙扫描示例

低功耗蓝牙通信大体分为以下几个过程:

1. 扫描蓝牙

def scan_for_devices(self):self.agent = QtBt.QBluetoothDeviceDiscoveryAgent()self.agent.setLowEnergyDiscoveryTimeout(5000)self.agent.deviceDiscovered.connect(self.show_info)self.agent.error.connect(self.agent_error)self.agent.finished.connect(self.agent_finished)self.timer = QtCore.QTimer(self.agent)self.timer.start(5)self.timer.timeout.connect(self.display_status)self.agent.start()def show_info(self, info: QtBt.QBluetoothDeviceInfo):if int(info.coreConfigurations()) == 1:print('Device discovered')print(f'Name: {info.name()}')print(f'Addr: {info.address().toString()}')print(f'ServUUID: {info.serviceUuids()}')def agent_error(self, e):error = ["NoError", "InputOutputError", "PoweredOffError", "InvalidBluetoothAdapterError", "UnknownError"]if e < 4:print(error[e])else:print(error[4])def agent_finished(self):print('Agent finished')for dev in self.agent.discoveredDevices():print(f'设备名称: {dev.name()} 设备地址: {dev.address()}')def display_status(self):print(self.agent.isActive(), self.agent.discoveredDevices())self.timer.stop()

上面代码可以看到首先要创建蓝牙设备扫描代理QBluetoothServiceDiscoveryAgent,并设置超时时间,然后依次是绑定设备搜索回调函数serviceDiscovered,这个函数返回搜索到蓝牙设备信息;绑定错误信息回调函数error;绑定搜索完成回调函数finished;最终搜索完成后(通过定时器结束搜索),在discoveredDevices中保存了本次搜索的所有蓝牙设备信息。

2. 连接蓝牙

3. 获取服务

def agent_finished(self):print('Agent finished')for dev in self.agent.discoveredDevices():# 通过蓝牙name连接 或者 通过蓝牙地址连接dev.address()if self.dev_name[0] == dev.name() or self.dev_name[1] == dev.name():print(f'连接设备: {dev.name()}')print(f'coreConfigurations: {int(dev.coreConfigurations())}')self.controller = QtBt.QLowEnergyController.createCentral(dev)self.controller.connected.connect(self.connect_Notify)self.controller.error.connect(self.controller_error)self.controller.disconnected.connect(self.disconnect_Notify)self.controller.serviceDiscovered.connect(self.addService)self.controller.discoveryFinished.connect(self.dis_Finished)self.controller.connectToDevice()breakdef connect_Notify(self, *args, **kwargs):print(f'连接通知')print(f'args: {args}')print(f'kwargs: {kwargs}')self.serviceUUID = list()self.controller.discoverServices()def controller_error(self, e):error = ["NoError", "UnknownError", "UnknownRemoteDeviceError", "NetworkError", "InvalidBluetoothAdapterError","ConnectionError"]if e < 6:print(error[e])else:print("UnknownError")def disconnect_Notify(*args, **kwargs):print(f'断开连接通知')print(f'args: {args}')print(f'kwargs: {kwargs}')def addService(self, uuid: QtBt.QBluetoothUuid):print('发现服务 Service discovered')print(f'uuid: {uuid.toString()}')self.serviceUUID.append(uuid)def dis_Finished(self):print(f'服务搜索完成')for uuid in self.serviceUUID:print(f'uuid: {uuid.toString()}')

上面代码是蓝牙扫描结束后,从扫描的设备中查找自己想要连接的蓝牙名称(或地址),通过低功耗蓝牙控制器QLowEnergyController连接设备,然后绑定连接完成回调函数connected、错误信息回调函数error、断开连接回调函数disconnected、搜索服务回调函数serviceDiscovered、服务搜索完成回调函数discoveryFinished,最后通过调用connectToDevice方法连接到蓝牙设备。其中在连接成功回调函数connect_Notify中有调用发现服务discoverServices方法,这个函数是查找蓝牙支持哪些服务,必须要调用,另外如果查找不到蓝牙服务看看蓝牙是否配对,配对有方法可调用,有需求的可查看官方文档,如果蓝牙连接错误可以将蓝牙设备删除,重新配对再测试。

4. 连接服务

5. 获取服务特征

def dis_Finished(self):print(f'服务搜索完成')for uuid in self.serviceUUID:if uuid.toString() == self.UUID_S:self.serviceUUID.clear()self.serviceUUID.append(uuid)self.ServiceObject = self.controller.createServiceObject(uuid)breakif self.ServiceObject is None:print(f'服务连接失败')else:print(f'服务连接成功')self.ServiceObject.stateChanged.connect(self.state_Changed)self.ServiceObject.error.connect(self.service_Error)self.ServiceObject.discoverDetails()def state_Changed(self, s):print(f'服务状态变化通知:{s} state:{self.ServiceObject.state()}')if s == QtBt.QLowEnergyService.DiscoveringServices:print(f"正在搜索服务特征... Discovering services...")returnelif s == QtBt.QLowEnergyService.ServiceDiscovered:print(f"搜索服务特征完成. Service discovered.")for ch in self.ServiceObject.characteristics():print(f'特征uuid:{ch.uuid().toString()}')def service_Error(self, error):ServiceError = ["NoError", "OperationError", "CharacteristicWriteError", "DescriptorWriteError", "UnknownError","CharacteristicReadError", "DescriptorReadError"]if error < 6:print(f'error:{error},{ServiceError[error]}, uuid:{self.ServiceObject.serviceUuid().toString()}')else:print(f'error:{error}')

上面代码是在搜索完服务后,从中查找自己需要的服务,通过createServiceObject方法创建低功耗蓝牙服务,之后绑定服务状态变化通知回调stateChanged,DiscoveringServices表示正在搜索服务特征,ServiceDiscovered表示搜索服务特征完成,其余状态我们不关注,然后调用discoverDetails方法开始搜索特征,所有的特征都在characteristics方法返回的列表中。

6. 创建服务特征句柄

7. 监听或写入特征

def create_write_characteristic(self, uuid: QtBt.QBluetoothUuid):data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF]if self.ServiceObject:self.characteristicWrite = self.ServiceObject.characteristic(uuid)# 判断特征是否可用print(f'isValid:{self.characteristicWrite.isValid()}')if self.characteristicWrite.isValid():self.ServiceObject.writeCharacteristic(self.characteristicWrite, bytes(data), QtBt.QLowEnergyService.WriteWithResponse)print("创建写特征成功,可进行写数据.")else:print("err:创建写特征失败,写特征不可用.")

上面代码是往蓝牙中写入数据,函数的入参是QBluetoothUuid类型,通过步骤5获得的服务特征的UUID创建写入句柄,使用服务writeCharacteristic方法写数据,方法传入写句柄和数据,数据必须是bytes类型,WriteWithResponse参数是写入通知,还可以传入WriteWithoutResponse无写入通知。

def create_read_notify(self, uuid: QtBt.QBluetoothUuid):if self.ServiceObject:self.characteristicRead_ = self.ServiceObject.characteristic(uuid)# 判断特征是否可用print(f'isValid:{self.characteristicRead_.isValid()}')if not self.characteristicRead_.isValid():print("err:创建读特征失败,读特征不可用.")returnprint("创建读特征成功,正在设置监听...")# 获取读特征描述符 descriptors()为listself.notification = self.characteristicRead_.descriptors()[0]# 判断读特征描述符是否可用print(f'read_notify.isValid:{self.notification.isValid()}')if not self.notification.isValid():print("err:读特征描述符不可用,监听失败.")return# 绑定监听函数self.ServiceObject.characteristicChanged.connect(self.characteristic_Changed)# 写0x01,0x00启用监听服务self.ServiceObject.writeDescriptor(self.notification, bytes.fromhex('0100'))print("设置监听服务成功,正在监听数据...")def characteristic_Changed(self, info: QtBt.QLowEnergyCharacteristic, value):print(f'特征读取变化通知')for i in list(data):str_ = str_ + ' ' + '{:02X}'.format(i)print(f'{str_}')

上面代码是监听服务(读数据),函数的入参同样是QBluetoothUuid类型,监听服务和写数据有区别,需要开启监听通知,创建读句柄之后,还需要设置读特征描述符,从descriptors方法中获取读特征描述符,该方法返回一个list,我手上的低功耗蓝牙list中只有一个数据元素,不知道其他BLE是否也是这样,list中的数据就是我们需要的读特征描述符,通过服务writeDescriptor方法设置0x01,0x00启用监听服务,监听回调返回的数据类型是bytearray。

8. 完整代码

附上完整代码:

#!/usr/bin/env python# --*--coding=utf-8--*--# pip install PyQt5import sysimport timefrom PyQt5 import QtCorefrom PyQt5.QtCore import QByteArrayfrom PyQt5 import QtBluetooth as QtBtclass Application(QtCore.QCoreApplication):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self.agent = Noneself.controller = Noneself.timer = Noneself.service = Noneself.serviceUUID = list()self.ServiceObject = Noneself.Service = Noneself.characteristicWrite = Noneself.characteristicRead_ = Noneself.notification = Noneself.descriptorReadUUID = list()self.descriptorWriteUUID = list()self.dev_name = ['C0CEF5E80E38', 'BLE HID KBD MICE']self.dev_addr = 'C0:CE:F5:E8:0E:38'# 要连接的服务UUIDself.UUID_S = "{6e400001-b5a3-f393-e0a9-e50e24dc4179}"# 写特征UUIDself.UUID_W = "{6e400002-b5a3-f393-e0a9-e50e24dc4179}"# 读特征UUIDself.UUID_R = "{6e400003-b5a3-f393-e0a9-e50e24dc4179}"# 启动扫描程序self.scan_for_devices()self.exec()def display_status(self):print(self.agent.isActive(), self.agent.discoveredDevices())self.timer.stop()def show_info(self, info: QtBt.QBluetoothDeviceInfo):# 过滤低功耗设备if int(info.coreConfigurations()) == QtBt.QBluetoothDeviceInfo.LowEnergyCoreConfiguration:print('Device discovered')print(f'Name: {info.name()}')print(f'Addr: {info.address().toString()}')print(f'ServUUID: {info.serviceUuids()}')def agent_finished(self):print('Agent finished')for dev in self.agent.discoveredDevices():# 通过蓝牙name连接 或者 通过蓝牙地址连接dev.address()if self.dev_name[0] == dev.name() or self.dev_name[1] == dev.name():print(f'连接设备: {dev.name()}')print(f'coreConfigurations: {int(dev.coreConfigurations())}')self.controller = QtBt.QLowEnergyController.createCentral(dev)self.controller.connected.connect(self.connect_Notify)self.controller.error.connect(self.controller_error)self.controller.disconnected.connect(self.disconnect_Notify)self.controller.serviceDiscovered.connect(self.addService)self.controller.discoveryFinished.connect(self.dis_Finished)self.controller.connectToDevice()breakdef controller_error(self, e):error = ["NoError", "UnknownError", "UnknownRemoteDeviceError", "NetworkError", "InvalidBluetoothAdapterError","ConnectionError"]if e < 6:print(error[e])else:print("UnknownError")def connect_Notify(self, *args, **kwargs):print(f'连接通知')print(f'args: {args}')print(f'kwargs: {kwargs}')self.serviceUUID = list()self.controller.discoverServices()def disconnect_Notify(*args, **kwargs):print(f'断开连接通知')print(f'args: {args}')print(f'kwargs: {kwargs}')def addService(self, uuid: QtBt.QBluetoothUuid):print('发现服务 Service discovered')print(f'uuid: {uuid.toString()}')self.serviceUUID.append(uuid)def dis_Finished(self):print(f'服务搜索完成')for uuid in self.serviceUUID:if uuid.toString() == self.UUID_S:self.serviceUUID.clear()self.serviceUUID.append(uuid)self.ServiceObject = self.controller.createServiceObject(uuid)breakif self.ServiceObject is None:print(f'服务连接失败')else:print(f'服务连接成功')self.ServiceObject.stateChanged.connect(self.state_Changed)self.ServiceObject.characteristicWritten.connect(self.characteristic_Written)self.ServiceObject.error.connect(self.service_Error)self.ServiceObject.discoverDetails()def characteristic_Written(self, info: QtBt.QLowEnergyCharacteristic, value):print(f'特征写入变化通知')ch = info.uuid().toString() + " - Characteristic written:" + str(value)print(f'{ch}')def characteristic_Changed(self, info: QtBt.QLowEnergyCharacteristic, value):print(f'特征读取变化通知')ch = info.uuid().toString() + " - Characteristic read:" + str(value)print(f'{ch}')def service_Error(self, error):ServiceError = ["NoError", "OperationError", "CharacteristicWriteError", "DescriptorWriteError", "UnknownError","CharacteristicReadError", "DescriptorReadError"]if error < 6:print(f'error:{error},{ServiceError[error]}, uuid:{self.ServiceObject.serviceUuid().toString()}')def state_Changed(self, s):print(f'服务状态变化通知:{s} state:{self.ServiceObject.state()}')if s == QtBt.QLowEnergyService.DiscoveringServices:print(f"正在搜索服务特征... Discovering services...")returnelif s == QtBt.QLowEnergyService.ServiceDiscovered:print(f"搜索服务特征完成. Service discovered.")self.descriptorReadUUID = list()self.descriptorWriteUUID = list()for ch in self.ServiceObject.characteristics():print(f'特征:{ch.uuid().toString()}')if ch.uuid().toString() == self.UUID_W:# 保存要写的特征UUIDself.descriptorWriteUUID.append(ch.uuid())# 创建写特征self.create_write_characteristic(ch.uuid())if ch.uuid().toString() == self.UUID_R:# 保存要读的特征UUIDself.descriptorReadUUID.append(ch.uuid())# 监听读特征self.create_read_notify(ch.uuid())def create_write_characteristic(self, uuid: QtBt.QBluetoothUuid):data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF]if self.ServiceObject:self.characteristicWrite = self.ServiceObject.characteristic(uuid)# 判断特征是否可用print(f'isValid:{self.characteristicWrite.isValid()}')if self.characteristicWrite.isValid():self.ServiceObject.writeCharacteristic(self.characteristicWrite, bytes(data), QtBt.QLowEnergyService.WriteWithResponse)print("创建写特征成功,可进行写数据.")else:print("err:创建写特征失败,写特征不可用.")def create_read_notify(self, uuid: QtBt.QBluetoothUuid):if self.ServiceObject:self.characteristicRead_ = self.ServiceObject.characteristic(uuid)# 判断特征是否可用print(f'isValid:{self.characteristicRead_.isValid()}')if not self.characteristicRead_.isValid():print("err:创建读特征失败,读特征不可用.")returnprint("创建读特征成功,正在设置监听...")# 获取读特征描述符 descriptors()为listself.notification = self.characteristicRead_.descriptors()[0]# 判断读特征描述符是否可用print(f'read_notify.isValid:{self.notification.isValid()}')if not self.notification.isValid():print("err:读特征描述符不可用,监听失败.")return# 绑定监听函数self.ServiceObject.characteristicChanged.connect(self.characteristic_Changed)# 写0x01,0x00启用监听服务self.ServiceObject.writeDescriptor(self.notification, bytes.fromhex('0100'))print("设置监听服务成功,正在监听数据...")def agent_error(self, e):error = ["NoError", "InputOutputError", "PoweredOffError", "InvalidBluetoothAdapterError", "UnknownError"]if e < 4:print(error[e])else:print(error[4])def scan_for_devices(self):self.agent = QtBt.QBluetoothDeviceDiscoveryAgent()self.agent.setLowEnergyDiscoveryTimeout(5000)self.agent.deviceDiscovered.connect(self.show_info)self.agent.error.connect(self.agent_error)self.agent.finished.connect(self.agent_finished)self.timer = QtCore.QTimer(self.agent)self.timer.start(5)self.timer.timeout.connect(self.display_status)self.agent.start()if __name__ == '__main__':app = Application(sys.argv)

经过以上步骤与BLE通信就算完成了,如有错误之处欢迎指正,转载请注明出处,谢谢!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。