初めてのBLE (11) Raspberry Piでペリフェラル③BME280に対応

 前回、example-gatt-serverのBattery Serviceを一部変更して、ラズパイのCPUの温度を送る変更をしました。ここでは、不要な部分を取り除いてスリムにします。

 ラズパイで動いているBlueZを使うには、Bluetooth protocol stack for Linuxのサンプルを利用します。

  Bluetooth protocol stack for Linuxのtestディレクトリ

(参考文献)BlueZのAPI/サンプルコードのメモ

example-gatt-serverをベースにBME280に対応させる

 MEMSセンサのBME280は、温度、気圧、湿度が測れます。ラズパイとBME280ボードを次のように接続します。BME280はI2CとSPIのどちらのインターフェースでもつなげられますが、アマゾンで入手したボードはI2C専用になっていました。

 次のプログラムで動作を確認しました。

import smbus, time, bme280

bus = smbus.SMBus(1)
bme280.load_calibration_params(bus, 0x76)

s = bme280.sample(bus,0x76)

findString = 'temp='
t = str(s).find(findString)
temp = str(s)[t+len(findString):t+len(findString)+4]
print('temperature ',temp)

findString = 'pressure='
t = str(s).find(findString)
press = str(s)[t+len(findString):t+len(findString)+4]
print('pressure ', press)

findString = 'humidity='
t = str(s).find(findString)
humi = str(s)[t+len(findString):t+len(findString)+4]
print('humidity ', humi)

 実行結果です。

example-gatt-serverの利用したのは

 サービスが何点か用意されています。その中でテスト用のTestServerを使いました。キャラはセキュアなどの事例は使わず、標準的なTestCharacteristic()を三つにし、bus番号を異なるようにしました。もともとあった、キャラの細かな設定用TestDescriptor、CharacteristicUserDescriptionDescriptorは捨てました。

 16~40行がBME280関連です。

 228~291行が修正した部分です。ほかは、もともとあったプログラムです。

import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service

import array
try:
  from gi.repository import GObject
except ImportError:
  import gobject as GObject
import sys

from random import randint
import smbus, bme280

bus = smbus.SMBus(1)
bme280.load_calibration_params(bus, 0x76)
temp = None
press = None
humi = None

def bme280Read():
    global temp
    global press
    global humi
    s = bme280.sample(bus,0x76)
    findString = 'temp='
    t = str(s).find(findString)
    temp = str(s)[t+len(findString):t+len(findString)+4]
    print('temperature ',temp)

    findString = 'pressure='
    t = str(s).find(findString)
    press = str(s)[t+len(findString):t+len(findString)+4]
    print('pressure ', press)

    findString = 'humidity='
    t = str(s).find(findString)
    humi = str(s)[t+len(findString):t+len(findString)+4]
    print('humidity ', humi)

mainloop = None

BLUEZ_SERVICE_NAME = 'org.bluez'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
DBUS_OM_IFACE =      'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE =    'org.freedesktop.DBus.Properties'

GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE =    'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE =    'org.bluez.GattDescriptor1'

class InvalidArgsException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'

class NotSupportedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotSupported'

class NotPermittedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.NotPermitted'

class InvalidValueLengthException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.InvalidValueLength'

class FailedException(dbus.exceptions.DBusException):
    _dbus_error_name = 'org.bluez.Error.Failed'


class Application(dbus.service.Object):
    """
    org.bluez.GattApplication1 interface implementation
    """
    def __init__(self, bus):
        self.path = '/'
        self.services = []
        dbus.service.Object.__init__(self, bus, self.path)
        self.add_service(TestService(bus, 0))

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_service(self, service):
        self.services.append(service)

    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
    def GetManagedObjects(self):
        response = {}
        print('GetManagedObjects')

        for service in self.services:
            response[service.get_path()] = service.get_properties()
            chrcs = service.get_characteristics()
            for chrc in chrcs:
                response[chrc.get_path()] = chrc.get_properties()
                descs = chrc.get_descriptors()
                for desc in descs:
                    response[desc.get_path()] = desc.get_properties()

        return response


class Service(dbus.service.Object):
    """
    org.bluez.GattService1 interface implementation
    """
    PATH_BASE = '/org/bluez/example/service'

    def __init__(self, bus, index, uuid, primary):
        self.path = self.PATH_BASE + str(index)
        self.bus = bus
        self.uuid = uuid
        self.primary = primary
        self.characteristics = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
                GATT_SERVICE_IFACE: {
                        'UUID': self.uuid,
                        'Primary': self.primary,
                        'Characteristics': dbus.Array(
                                self.get_characteristic_paths(),
                                signature='o')
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_characteristic(self, characteristic):
        self.characteristics.append(characteristic)

    def get_characteristic_paths(self):
        result = []
        for chrc in self.characteristics:
            result.append(chrc.get_path())
        return result

    def get_characteristics(self):
        return self.characteristics

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != GATT_SERVICE_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_SERVICE_IFACE]


class Characteristic(dbus.service.Object):
    """
    org.bluez.GattCharacteristic1 interface implementation
    """
    def __init__(self, bus, index, uuid, flags, service):
        self.path = service.path + '/char' + str(index)
        self.bus = bus
        self.uuid = uuid
        self.service = service
        self.flags = flags
        self.descriptors = []
        dbus.service.Object.__init__(self, bus, self.path)

    def get_properties(self):
        return {
                GATT_CHRC_IFACE: {
                        'Service': self.service.get_path(),
                        'UUID': self.uuid,
                        'Flags': self.flags,
                        'Descriptors': dbus.Array(
                                self.get_descriptor_paths(),
                                signature='o')
                }
        }

    def get_path(self):
        return dbus.ObjectPath(self.path)

    def add_descriptor(self, descriptor):
        self.descriptors.append(descriptor)

    def get_descriptor_paths(self):
        result = []
        for desc in self.descriptors:
            result.append(desc.get_path())
        return result

    def get_descriptors(self):
        return self.descriptors

    @dbus.service.method(DBUS_PROP_IFACE,
                         in_signature='s',
                         out_signature='a{sv}')
    def GetAll(self, interface):
        if interface != GATT_CHRC_IFACE:
            raise InvalidArgsException()

        return self.get_properties()[GATT_CHRC_IFACE]

    @dbus.service.method(GATT_CHRC_IFACE,
                        in_signature='a{sv}',
                        out_signature='ay')
    def ReadValue(self, options):
        print('Default ReadValue called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
    def WriteValue(self, value, options):
        print('Default WriteValue called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE)
    def StartNotify(self):
        print('Default StartNotify called, returning error')
        raise NotSupportedException()

    @dbus.service.method(GATT_CHRC_IFACE)
    def StopNotify(self):
        print('Default StopNotify called, returning error')
        raise NotSupportedException()

    @dbus.service.signal(DBUS_PROP_IFACE,
                         signature='sa{sv}as')
    def PropertiesChanged(self, interface, changed, invalidated):
        pass

class TestService(Service):
    TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0'

    def __init__(self, bus, index):
        Service.__init__(self, bus, index, self.TEST_SVC_UUID, True)
        self.add_characteristic(TestCharacteristic(bus, 0, self))
        self.add_characteristic(TestCharacteristic1(bus, 1, self))
        self.add_characteristic(TestCharacteristic2(bus, 2, self))

class TestCharacteristic(Characteristic):
    """
    Dummy test characteristic. Allows writing arbitrary bytes to its value, and
    contains "extended properties", as well as a test descriptor.

    """
    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.TEST_CHRC_UUID,
                ['read'],
                service)
        self.value = [5]

    def ReadValue(self, options):
        bme280Read()
        self.value = [float(temp)*10]
        print('TestCharacteristic Read: ' + repr(self.value))
        return self.value

class TestCharacteristic1(Characteristic):
    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef7'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.TEST_CHRC_UUID,
                ['read'],
                service)
        self.value = [50]

    def ReadValue(self, options):
        bme280Read()
        self.value = [float(press)-1000]
        print('TestCharacteristic Read: ' + repr(self.value))
        return self.value

class TestCharacteristic2(Characteristic):
    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef8'

    def __init__(self, bus, index, service):
        Characteristic.__init__(
                self, bus, index,
                self.TEST_CHRC_UUID,
                ['read'],
                service)
        self.value = [55]

    def ReadValue(self, options):
        bme280Read()
        self.value = [float(humi)]
        print('TestCharacteristic Read: ' + repr(self.value))
        return self.value

def register_app_cb():
    print('GATT application registered')


def register_app_error_cb(error):
    print('Failed to register application: ' + str(error))
    mainloop.quit()


def find_adapter(bus):
    remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
                               DBUS_OM_IFACE)
    objects = remote_om.GetManagedObjects()

    for o, props in objects.items():
        if GATT_MANAGER_IFACE in props.keys():
            return o

    return None

def main():
    global mainloop

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    bus = dbus.SystemBus()

    adapter = find_adapter(bus)
    if not adapter:
        print('GattManager1 interface not found')
        return

    service_manager = dbus.Interface(
            bus.get_object(BLUEZ_SERVICE_NAME, adapter),
            GATT_MANAGER_IFACE)

    app = Application(bus)

    mainloop = GObject.MainLoop()

    print('Registering GATT application...')

    service_manager.RegisterApplication(app.get_path(), {},
                                    reply_handler=register_app_cb,
                                    error_handler=register_app_error_cb)

    mainloop.run()

if __name__ == '__main__':
    main()

 上記のGatt Serverのプログラムを動かし、

sudo hciconfig hci0 leadv

でアドバタイジングします。

前へ

初めてのBLE (10) Raspberry Piでペリフェラル②CPUの温度を送る

次へ

初めてのBLE (12) Raspberry Piでペリフェラル④Nano 33 BLE Senseをセントラルに