Bonjour,
J’utilise 2 sondes WITMOTION modèle WT901BLECL5.0 . Bluetooth 5.0
la connexion bluetooth se fait sans adaptateur.
J’ utilise le code python sur github conseillé par le fabriquant
Avec une sonde tout va bien par contre je ne trouve pas de solution pour lancer les 2 sondes simultanément.
Je cherche la solution sur tous les sites depuis de nombreux jours sans résultat.
Si quelqu’un a une idée je suis preneur.
Merci par avance.
Voici le code :
# coding:UTF-8
import threading
import time
import struct
import bleak
import asyncio
# 扫描到的设备 Scanned devices
devices = []
# 蓝牙设备 BLEDevice
BLEDevice = None
# 设备实例 Device instance
class DeviceModel:
# region 属性 attribute
# 设备名称 deviceName
deviceName = "WITMOTION"
# 设备数据字典 Device Data Dictionary
deviceData = {}
# 设备是否开启
isOpen = False
# 临时数组 Temporary array
TempBytes = []
# endregion
def __init__(self, deviceName, BLEDevice): # , callback_method):
print("Initialize device model")
# 设备名称(自定义) Device Name
self.deviceName = deviceName
self.BLEDevice = BLEDevice
self.client = None
self.writer_characteristic = None
self.isOpen = False
# self.callback_method = callback_method
self.deviceData = {}
# region 获取设备数据 Obtain device data
# 设置设备数据 Set device data
def set(self, key, value):
# 将设备数据存到键值 Saving device data to key values
self.deviceData[key] = value
# 获得设备数据 Obtain device data
def get(self, key):
# 从键值中获取数据,没有则返回None Obtaining data from key values
if key in self.deviceData:
return self.deviceData[key]
else:
return None
# 删除设备数据 Delete device data
def remove(self, key):
# 删除设备键值
del self.deviceData[key]
# endregion
# 打开设备 open Device
async def openDevice(self):
print("Opening device......")
# 获取设备的服务和特征 Obtain the services and characteristic of the device
async with bleak.BleakClient(self.BLEDevice, timeout=15) as client:
self.client = client
self.isOpen = True
# 设备UUID常量 Device UUID constant
target_service_uuid = "0000ffe5-0000-1000-8000-00805f9a34fb"
target_characteristic_uuid_read = "0000ffe4-0000-1000-8000-00805f9a34fb"
target_characteristic_uuid_write = "0000ffe9-0000-1000-8000-00805f9a34fb"
notify_characteristic = None
print("Matching services......")
for service in client.services:
if service.uuid == target_service_uuid:
print(f"Service: {service}")
print("Matching characteristic......")
for characteristic in service.characteristics:
if characteristic.uuid == target_characteristic_uuid_read:
notify_characteristic = characteristic
if characteristic.uuid == target_characteristic_uuid_write:
self.writer_characteristic = characteristic
if notify_characteristic:
break
if self.writer_characteristic:
# 读取磁场四元数 Reading magnetic field quaternions
print("Reading magnetic field quaternions")
time.sleep(3)
asyncio.create_task(self.sendDataTh())
if notify_characteristic:
print(f"Characteristic: {notify_characteristic}")
# 设置通知以接收数据 Set up notifications to receive data
await client.start_notify(notify_characteristic.uuid, self.onDataReceived)
# 保持连接打开 Keep connected and open
try:
while self.isOpen:
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
finally:
# 在退出时停止通知 Stop notification on exit
await client.stop_notify(notify_characteristic.uuid)
else:
print("No matching services or characteristic found")
# 关闭设备 close Device
def closeDevice(self):
self.isOpen = False
print("The device is turned off")
async def sendDataTh(self):
while self.isOpen:
await self.readReg(0x3A)
time.sleep(0.1)
await self.readReg(0x51)
time.sleep(0.1)
# region 数据解析 data analysis
# 串口数据处理 Serial port data processing
def onDataReceived(self, sender, data):
tempdata = bytes.fromhex(data.hex())
for var in tempdata:
self.TempBytes.append(var)
if len(self.TempBytes) == 1 and self.TempBytes[0] != 0x55:
del self.TempBytes[0]
continue
if len(self.TempBytes) == 2 and (self.TempBytes[1] != 0x61 and self.TempBytes[1] != 0x71):
del self.TempBytes[0]
continue
if len(self.TempBytes) == 20:
self.processData(self.TempBytes)
self.TempBytes.clear()
# 数据解析 data analysis
def processData(self, Bytes):
if Bytes[1] == 0x61:
Ax = self.getSignInt16(Bytes[3] << 8 | Bytes[2]) / 32768 * 16
Ay = self.getSignInt16(Bytes[5] << 8 | Bytes[4]) / 32768 * 16
Az = self.getSignInt16(Bytes[7] << 8 | Bytes[6]) / 32768 * 16
Gx = self.getSignInt16(Bytes[9] << 8 | Bytes[8]) / 32768 * 2000
Gy = self.getSignInt16(Bytes[11] << 8 | Bytes[10]) / 32768 * 2000
Gz = self.getSignInt16(Bytes[13] << 8 | Bytes[12]) / 32768 * 2000
AngX = self.getSignInt16(Bytes[15] << 8 | Bytes[14]) / 32768 * 180
AngY = self.getSignInt16(Bytes[17] << 8 | Bytes[16]) / 32768 * 180
AngZ = self.getSignInt16(Bytes[19] << 8 | Bytes[18]) / 32768 * 180
self.set("AccX", round(Ax, 3))
self.set("AccY", round(Ay, 3))
self.set("AccZ", round(Az, 3))
self.set("AsX", round(Gx, 3))
self.set("AsY", round(Gy, 3))
self.set("AsZ", round(Gz, 3))
self.set("AngX", round(AngX, 3))
self.set("AngY", round(AngY, 3))
self.set("AngZ", round(AngZ, 3))
# self.callback_method(self)
print(AngX)
else:
# 磁场 magnetic field
if Bytes[2] == 0x3A:
Hx = self.getSignInt16(Bytes[5] << 8 | Bytes[4]) / 120
Hy = self.getSignInt16(Bytes[7] << 8 | Bytes[6]) / 120
Hz = self.getSignInt16(Bytes[9] << 8 | Bytes[8]) / 120
self.set("HX", round(Hx, 3))
self.set("HY", round(Hy, 3))
self.set("HZ", round(Hz, 3))
# 四元数 Quaternion
elif Bytes[2] == 0x51:
Q0 = self.getSignInt16(Bytes[5] << 8 | Bytes[4]) / 32768
Q1 = self.getSignInt16(Bytes[7] << 8 | Bytes[6]) / 32768
Q2 = self.getSignInt16(Bytes[9] << 8 | Bytes[8]) / 32768
Q3 = self.getSignInt16(Bytes[11] << 8 | Bytes[10]) / 32768
self.set("Q0", round(Q0, 5))
self.set("Q1", round(Q1, 5))
self.set("Q2", round(Q2, 5))
self.set("Q3", round(Q3, 5))
else:
pass
# 获得int16有符号数 Obtain int16 signed number
@staticmethod
def getSignInt16(num):
if num >= pow(2, 15):
num -= pow(2, 16)
return num
# endregion
# 发送串口数据 Sending serial port data
async def sendData(self, data):
try:
if self.client.is_connected and self.writer_characteristic is not None:
await self.client.write_gatt_char(self.writer_characteristic.uuid, bytes(data))
except Exception as ex:
print(ex)
# 读取寄存器 read register
async def readReg(self, regAddr):
# 封装读取指令并向串口发送数据 Encapsulate read instructions and send data to the serial port
await self.sendData(self.get_readBytes(regAddr))
# 写入寄存器 Write Register
async def writeReg(self, regAddr, sValue):
# 解锁 unlock
self.unlock()
# 延迟100ms Delay 100ms
time.sleep(0.1)
# 封装写入指令并向串口发送数据
await self.sendData(self.get_writeBytes(regAddr, sValue))
# 延迟100ms Delay 100ms
time.sleep(0.1)
# 保存 save
self.save()
# 读取指令封装 Read instruction encapsulation
@staticmethod
def get_readBytes(regAddr):
# 初始化
tempBytes = [None] * 5
tempBytes[0] = 0xff
tempBytes[1] = 0xaa
tempBytes[2] = 0x27
tempBytes[3] = regAddr
tempBytes[4] = 0
return tempBytes
# 写入指令封装 Write instruction encapsulation
@staticmethod
def get_writeBytes(regAddr, rValue):
# 初始化
tempBytes = [None] * 5
tempBytes[0] = 0xff
tempBytes[1] = 0xaa
tempBytes[2] = regAddr
tempBytes[3] = rValue & 0xff
tempBytes[4] = rValue >> 8
return tempBytes
# 解锁 unlock
def unlock(self):
cmd = self.get_writeBytes(0x69, 0xb588)
self.sendData(cmd)
# 保存 save
def save(self):
cmd = self.get_writeBytes(0x00, 0x0000)
self.sendData(cmd)
# 数据更新时会调用此方法 This method will be called when data is updated
# def updateData(DeviceModel):
# 直接打印出设备数据字典 Directly print out the device data dictionary
# print(DeviceModel.deviceData)
# 获得X轴加速度 Obtain X-axis acceleration
# print(DeviceModel.get("AccX"))
"""
# CONNECTION SIMULTANES 2 SENSORS WITMOTION WitBluetooth_BWT901BLE5_0
"""
connections = (
DeviceModel(1, "CF:58:6E:DB:78:2C"),# , updateData),
DeviceModel(2, "FE:5A:60:57:27:B7") #, updateData)
)
async def handle_connection(DeviceModel):
await DeviceModel.openDevice()
await DeviceModel.receive_message()
# ...
async def run():
tasks = []
for DeviceModel in connections:
tasks.append(asyncio.create_task(handle_connection(DeviceModel)))
# await until all the tasks finish
await asyncio.gather(*tasks)
asyncio.run(run())
"""
# CONNECTION 1 SENSOR
device0 = DeviceModel("WT901BLE68","FE:5A:60:57:27:B7")
asyncio.run(device0.openDevice())
# device1 = DeviceModel("WT901BLE68","CF:58:6E:DB:78:2C")
# asyncio.run(device1.openDevice())
"""