Давно хотел подключиться к весам напрямую и обрабатывать новости о своем похудении минуя жадных до данных ксяомцев
Намедни, с помощью Дипсика (пришлось ему помочь, предоставив данные о том, как закодированы данные в ответе весов) за 15 минут накарябал следующее:
import asyncio
from bleak import BleakClient, BleakScanner
from bleak.exc import BleakError
SCALE_MAC = "00:00:00:00:00:00" # MAC адрес весов из приложения Zepp Life
SERVICE_UUID = "0000181b-0000-1000-8000-00805f9b34fb" # Оставить как есть
CHARACTERISTIC_UUID = "00002a9c-0000-1000-8000-00805f9b34fb" # Оставить как есть
def parse_scale_data(data):
if len(data) < 13:
return None
status_byte = data[1]
is_fixed = status_byte & 0x02
has_impedance = status_byte & 0x04
raw_weight = int.from_bytes(data[11:13], byteorder='little')
weight_kg = raw_weight * 0.005
return {
'weight': weight_kg,
'is_final': is_fixed,
'is_stabilized': is_fixed,
'raw_weight': raw_weight,
'has_impedance': has_impedance,
'impedance': int.from_bytes(data[9:11], byteorder='little') if has_impedance else None
}
def notification_handler(sender, data):
"""Обработчик уведомлений"""
if len(data) < 13 or data[0] != 0x02:
return
parsed = parse_scale_data(data)
if not parsed:
return
if parsed['is_final']:
print(f"\nФинальный вес: {parsed['weight']:.2f} кг")
if parsed['has_impedance']:
print(f"Импеданс: {parsed['impedance']} Ом")
else:
print(f"Измерение: {parsed['weight']:.2f} кг", end='\r')
async def monitor_scale():
"""Постоянный мониторинг весов"""
while True:
print("\n--- Начинаю сканирование ---")
try:
# Сканируем устройство
device = await BleakScanner.find_device_by_address(SCALE_MAC, timeout=10)
if not device:
print("Весы не найдены, повторяю через 5 секунд...")
await asyncio.sleep(5)
continue
print(f"Найдены весы {device.name}, подключаюсь...")
async with BleakClient(device) as client:
print("Подключено. Встаньте на весы...")
await client.start_notify(CHARACTERISTIC_UUID, notification_handler)
# Ждем отключения или 5 минут бездействия
try:
while True:
await asyncio.sleep(1)
if not client.is_connected:
print("\nСоединение потеряно")
break
except Exception as e:
print(f"\nОшибка соединения: {e}")
finally:
try:
await client.stop_notify(CHARACTERISTIC_UUID)
except:
0 pass
print("Отключено")
except Exception as e:
print(f"\nОшибка: {e}")
await asyncio.sleep(5)
# Для Jupyter Notebook
try:
await monitor_scale()
except KeyboardInterrupt:
print("\nМониторинг остановлен")
Код рабочий, с чем себя и поздравляю.
Очень хочется разобраться с добычей всех данных из Zepp Life, но с этим пока непросто.