Скрипт для подключения к весам Xiaomi MiScale 2 и парсинга данных

Давно хотел подключиться к весам напрямую и обрабатывать новости о своем похудении минуя жадных до данных ксяомцев

Намедни, с помощью Дипсика (пришлось ему помочь, предоставив данные о том, как закодированы данные в ответе весов) за 15 минут накарябал следующее:

import asyncio
from bleak import BleakClient, BleakScanner
from bleak.exc import BleakError

SCALE_MAC = "00:00:00:00:00:00" # MAC адрес весов из приложения Zepp Life

SERVICE_UUID = "0000181b-0000-1000-8000-00805f9b34fb" # Оставить как есть
CHARACTERISTIC_UUID = "00002a9c-0000-1000-8000-00805f9b34fb" # Оставить как есть

def parse_scale_data(data):

    if len(data) < 13:
        return None
    
    status_byte = data[1]
    is_fixed = status_byte & 0x02
    has_impedance = status_byte & 0x04
    
    raw_weight = int.from_bytes(data[11:13], byteorder='little')
    weight_kg = raw_weight * 0.005
    
    return {
        'weight': weight_kg,
        'is_final': is_fixed,
        'is_stabilized': is_fixed,
        'raw_weight': raw_weight,
        'has_impedance': has_impedance,
        'impedance': int.from_bytes(data[9:11], byteorder='little') if has_impedance else None
    }

def notification_handler(sender, data):
    """Обработчик уведомлений"""
    if len(data) < 13 or data[0] != 0x02:
        return
    
    parsed = parse_scale_data(data)
    if not parsed:
        return
    
    if parsed['is_final']:
        print(f"\nФинальный вес: {parsed['weight']:.2f} кг")
        if parsed['has_impedance']:
            print(f"Импеданс: {parsed['impedance']} Ом")
    else:
        print(f"Измерение: {parsed['weight']:.2f} кг", end='\r')

async def monitor_scale():
    """Постоянный мониторинг весов"""
    while True:
        print("\n--- Начинаю сканирование ---")
        try:
            # Сканируем устройство
            device = await BleakScanner.find_device_by_address(SCALE_MAC, timeout=10)
            if not device:
                print("Весы не найдены, повторяю через 5 секунд...")
                await asyncio.sleep(5)
                continue

            print(f"Найдены весы {device.name}, подключаюсь...")
            
            async with BleakClient(device) as client:
                print("Подключено. Встаньте на весы...")
                await client.start_notify(CHARACTERISTIC_UUID, notification_handler)
                
                # Ждем отключения или 5 минут бездействия
                try:
                    while True:
                        await asyncio.sleep(1)
                        if not client.is_connected:
                            print("\nСоединение потеряно")
                            break
                except Exception as e:
                    print(f"\nОшибка соединения: {e}")
                finally:
                    try:
                        await client.stop_notify(CHARACTERISTIC_UUID)
                    except:
      0                  pass
                    print("Отключено")
                    
        except Exception as e:
            print(f"\nОшибка: {e}")
            await asyncio.sleep(5)

# Для Jupyter Notebook
try:
    await monitor_scale()
except KeyboardInterrupt:
    print("\nМониторинг остановлен")

Код рабочий, с чем себя и поздравляю.

Очень хочется разобраться с добычей всех данных из Zepp Life, но с этим пока непросто.

В свободное от работы время…

... озадачился давеча организацией интерфейса управления роботом.

Поскольку, если у человека в руках молоток, то все вокруг ему напоминает гвоздь, решил остановиться на реализации оного интерфейса посредством flask. Поднимаем на Raspberry flask-server, получаем от процесса управления роботом данные, посылаем из фласка роботу команды. Все просто. Казалось бы.

Однако ж, оказалось не все.

Читать далее

Arduino L293 Motor Shield для raspberry Pi


Как всегда, не разобравшись, заказал драйвера двигателей. При заказе ориентировался на внешний вид )) Привык, понимаешь, что прям на плате написано, что куда подключать, ну, на крайняк, в интернетах можно найтить схему.

Ни хрена подобного. Перерыв интернеты, не нашел ни одного вменяемого туториала, о том, как подключить драйвер к Raspberry Pi. Даже даташита нормального нет. Абсолютно непонятно, какой пин на плате отвечает за что. Есть картинка (см. выше) и указание использовать библиотеку AMSpi. Проблема только что бибkиотека чуть ли не для первой малины. Ну и, по факту, библиотека оказалась не очень рабочей, по крайне мере для подключения. изображенного на картинке.

Я одновременно в бешенстве и расстройстве. Придется заказать другой драйвер взять ардуиновские библиотеки и, продираясь через все эти фигурные и не очень скобочки разбираться, чо там к чему.

Ф-я получения данных о состоянии системы

import psutil
import datetime
import platform


def get_system_info():

    result = {}

    # ОС
    os_platform = platform.system()
    # Процессор
    processor = platform.processor()
    result.update({'platform': os_platform, 'processor': processor})
    # диски
    disks = psutil.disk_partitions()
    for i in disks:
        try:
            disk_usage = psutil.disk_usage(i.device)
            total = int(disk_usage .total) / 1048576
            used = int(disk_usage .used) / 1048576
            free = int(disk_usage .free) / 1048576
            percent = float(disk_usage .percent)
            disk_dict = {'total_space': total, 'used_space': used, 'free_space': free, 'free_percent': percent}
            result.update({'disk_space': disk_dict})

        except:
            pass

    # загрузка процессора
    cpu_percent = psutil.cpu_percent()
    result.update({'cpu_percent': cpu_percent})

    # аптайм
    boot_time = psutil.boot_time()
    uptime = datetime.datetime.now().timestamp() - boot_time
    result.update({'uptime': uptime})

    # оперативка:
    mem_status = psutil.virtual_memory()
    m_total = int(mem_status.total) / 1048576
    m_available = int(mem_status.available) / 1048576
    m_used = int(mem_status.used) / 1048576
    m_free = int(mem_status.free) / 1048576
    m_percent = float(mem_status.percent)
    memory_dict = {'total_mem': m_total, 'available_mem': m_available, 'used_mem': m_used,
                   'free_mem': m_free, 'free_percent_mem': m_percent}
    result.update({'memory_space': memory_dict})

    # температура процессора:
    try:
        # ToDo скорее всего надо прописать итератор для перечня  датчиков
        proc_temp = float(psutil.sensors_temperatures().current)
    except:
        proc_temp = None

    result.update({'proc_temp': proc_temp})

    return result


if __name__ == "__main__":
    res = get_system_info()
    for i in res.keys():
        print(i, res[i])

Источники:

Читать далее

Хм…

Если попробовать скриптом получить все посты с сайта и забыть поставить выход из цикла, то хостер заблокирует ip. Век живи- век учись )))

Функция расчета TWR (time weighted return)

def twr_eavaluate(df):
    
    """Ф-я рассчитывает time weighted return  по данным датафрейма ['AUM_DATE', 'POSITION_VALUE','NET_INPUT']

     AUM_DATE - колонка с датами
     POSITION_VALUE - оценочная  стоимость портфеля на дату
     NET_INPUT - вводы-выводы средств на дату (ввод - вывод)"""

    df.sort_values(by='AUM_DATE', inplace=True)

    # порфтель на  день-1
    prev_portf = [0] + df['POSITION_VALUE'].tolist()[:-1]
    df['prev_portf'] = prev_portf

    df['delta_for_twr'] = (df['POSITION_VALUE'] - df['NET_INPUT']) / df['prev_portf']

    # замена пропусков
    df = df.replace([np.inf, -np.inf], np.nan)
    df.dropna(axis=0, subset=['delta_for_twr'], inplace=True)
    twr = df['delta_for_twr'].prod() - 1
    return twr

Простая функция расчета TWR (time-weighted return) для определения эффективности вложений с учетом вводов и выводов средств

Функция поиска ISIN в строке

Википедия:

Международный идентификационный код ценной бумаги (англ. International Securities Identification Number, общепринятое сокращение — ISIN) — 12-разрядный буквенно-цифровой код, однозначно идентифицирующий ценную бумагу.

import re


def look_for_isins(string):

    ''' Возвращает список ISIN из переданной строки'''

    isins = []
    isin_pattern = '[A-Z]{2}\w{9}\d{1}'
    f = re.findall(isin_pattern, string)
    for i in f:
        isins.append(i)
    return isins

Парсинг писем из Outlook в Python

Python-cкрипт разбора писем из папки "Входящие" Outlook. Сохранение письма как html-страницы с сохранением отображения вложенных картинок

import win32com.client
import os
import datetime
import re

outlook = win32com.client.Dispatch("Outlook.Application").GetNamespace("MAPI")
inbox = outlook.GetDefaultFolder(6).Items  # 6- папка Входящие Outlook
msg = inbox.GetLast() # последнее письмо в ящике

path  = r'C:\Users\user\Documents' # папка для сохранения вложений письма

while msg :
    subject = str(msg.Subject) # тема письма
    msg_date = datetime.datetime.strptime(str(msg.SentOn)[0:19], '%Y-%m-%d %H:%M:%S')
    to_list = str(msg.To).split(';') # список получателей
    sender = msg.SenderName # отправитель
    text = str(msg.Body) # текст письма
    html_text = str(msg.HTMLBody) # html код письма
    
    # сохранение вложений
    att_list=[]
    for att in msg.Attachments:
        att_name = att.FileName
        att.SaveAsFile(path + '\\' + att_name) # сохранение файла вложения
        att_list.append(att_name)  
    
    # корректировка html-кода для отражения вложенных картинок (замена адреса картинок на локальный вариант)
    if att_list:
        for fname in att_list:
            # паттерн ссылки на вставленное изображение
            pattern = '\"cid:' + fname  + '@[\w]{,20}.[\w]{,20}\"'
            html_text = re.sub(pattern, '\"' + fname + '\"', html_text, count=5)
    
    # создание  html-страницы с телом письма
    with open(path + '\\' + 'index_.html', 'w', encoding='utf8') as file:
        file.write(html_text)
    
    msg = inbox.GetPrevious() # переход к следующему письму

Перевод категориальных признаков датафрейма в бинарные

Заметка

test_df
номер город колич. признак
0 2 москва 1
1 3 питер 2
2 4 саратов 3
3 5 москва 4
4 6 питер 3
5 7 москва 4
6 8 сыктывкар 5
7 9 саратов 6
df_nonbinary = pd.get_dummies(test_df['город'], )
df_nonbinary 
москва питер саратов сыктывкар
0 1.0 0.0 0.0 0.0
1 0.0 1.0 0.0 0.0
2 0.0 0.0 1.0 0.0
3 1.0 0.0 0.0 0.0
4 0.0 1.0 0.0 0.0
5 1.0 0.0 0.0 0.0
6 0.0 0.0 0.0 1.0
7 0.0 0.0 1.0 0.0
new_df = pd.concat([test_df, df_nonbinary], axis=1 )
new_df
номер город колич. признак москва питер саратов сыктывкар
0 2 москва 1 1.0 0.0 0.0 0.0
1 3 питер 2 0.0 1.0 0.0 0.0
2 4 саратов 3 0.0 0.0 1.0 0.0
3 5 москва 4 1.0 0.0 0.0 0.0
4 6 питер 3 0.0 1.0 0.0 0.0
5 7 москва 4 1.0 0.0 0.0 0.0
6 8 сыктывкар 5 0.0 0.0 0.0 1.0
7 9 саратов 6 0.0 0.0 1.0 0.0

Функция конвертации секунд в более крупные периоды времени (Python)

Заметка

Функция  превращает количество секунд в удобночитаемые человеком  значения времени.

def seconds_to_str(uptime):
    """
       Функция принимает числовое значение секунд и возвращает строку в формате:
       '3 нед., 1 дн., 18 час., 23 мин., 3 сек.'
       Доли секунды округляются до секунд.
    """

    seconds = ''
    minutes = ''
    hours = ''
    days = ''
    weeks = ''
    uptime = round(uptime, 0)

    if uptime >= 60:
        minutes = uptime // 60
        if minutes >= 60:
            hours = minutes // 60
            if hours >= 24:
                days = hours // 24
                if days >= 7:
                    weeks = days // 7
                    seconds, minutes, hours, days, weeks = str(int(uptime % 60)), str(int(minutes % 60)), str(
                        int(hours % 24)), str(int(days % 7)), str(int(weeks))
                else:
                    seconds, minutes, hours, days = str(int(uptime % 60)), str(int(minutes % 60)), str(
                        int(hours % 24)), str(int(days))
            else:
                seconds, minutes, hours = str(int(uptime % 60)), str(int(minutes % 60)), str(int(hours))
        else:
            seconds, minutes = str(int(uptime % 60)), str(int(minutes))
    else:
        seconds = str(int(uptime // 1))

    if weeks:
        weeks = weeks + ' нед.,'
    if days:
        days = days + ' дн.,'
    if hours:
        hours = hours + ' час.,'
    if minutes:
        minutes = minutes + ' мин.,'
    if seconds:
        seconds = seconds + ' сек.'
    res = weeks + ' ' + days + ' ' + hours + ' ' + minutes + ' ' + seconds
    print(res.strip())
    return res.strip()