CircuitPython 10行プログラミング Step8 (7) BLE stream

 前回、ユニークなUUIDが使えるJSON事例を動かしました。ここでは、streamを動かします。

streamを使ったペリフェラル用クラス

 新規作成で次のstreamのクラスを記述したソースを入力し、SwitchBot.pyで保存します。


# SPDX-FileCopyrightText: 2020 Mark Raleson
# SPDX-License-Identifier: MIT

from adafruit_ble.uuid import VendorUUID
from adafruit_ble.services import Service
from adafruit_ble.characteristics import Characteristic
from adafruit_ble.characteristics.stream import StreamIn
from adafruit_ble.characteristics.stream import StreamOut

class SensorService(Service):

    uuid = VendorUUID("51ad213f-e568-4e35-84e4-67af89c79ef0")

    settings = StreamOut(
        uuid=VendorUUID("e077bdec-f18b-4944-9e9e-8b3a815162b4"),
        properties=Characteristic.READ | Characteristic.NOTIFY,
    )

    sensors = StreamIn(
        properties=Characteristic.WRITE ,
        uuid=VendorUUID("528ff74b-fdb8-444c-9c64-3dd5da4135ae"),
    )

    def __init__(self, service=None):
        super().__init__(service=service)
        self.connectable = True

streamを使ったペリフェラル

 code.pyで保存します。これがメインです。

 settings.write('StreamOut:' + str(measure()) +':')で測定値(measure()はランダムな2桁の整数値)を送ります。

 R = readSensor.read()は、コネクトしたセントラルから送られてくるデータを受け取ります。


import time
import random
from SwitchBot import SensorService
from adafruit_ble import BLERadio
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement

# Create BLE radio, custom service, and advertisement.
ble = BLERadio()
sService = SensorService()
#command = 0x570f31
advertisement = ProvideServicesAdvertisement(sService)
ble.name = "testSwitchBot"

# Function to get some fake weather sensor readings for this example in the desired unit.
def measure():
    humidity = random.uniform(0.0, 100.0)
    return int(humidity)

# Advertise until another device connects, when a device connects, provide sensor data.
while True:
    print("Advertise services")
    ble.stop_advertising()  # you need to do this to stop any persistent old advertisement
    ble.start_advertising(advertisement)

    print("Waiting for connection...")
    while not ble.connected:
        pass

    print("Connected")
    while ble.connected:
        settings = sService.settings
        settings.write('StreamOut:' + str(measure()) +':')
        readSensor = sService.sensors
        R = readSensor.read()
        print("Settings: ", settings)
        print("Sensors: ", measure())
        print("read Sensors: ", R)
        time.sleep(0.25)

    print("Disconnected")

 実行中の画面です。

 Sensors:XXは、送っているセンサ・データです。

 read Sensors: b''はセントラルから送られたデータを受け取った値です。今、何も受け取っていないのでブランクです。

 コネクトするのは、RSL10 Bluetooth Low Energy Explorerです。

 Notificationをチェックすると、StreamOutの「StreamOut:XX:」を連続して受け取っています。

 ここで、StreamIn側に0x414243(バイナリ形式)を入力して、Write Reqをクリックします。

 Muエディタを見ます。

 ABCを受け取りました。