初めてのBLE (11) Raspberry Piでペリフェラル③BME280に対応
前回、example-gatt-serverのBattery Serviceを一部変更して、ラズパイのCPUの温度を送る変更をしました。ここでは、不要な部分を取り除いてスリムにします。
ラズパイで動いているBlueZを使うには、Bluetooth protocol stack for Linuxのサンプルを利用します。
Bluetooth protocol stack for Linuxのtestディレクトリ
(参考文献)BlueZのAPI/サンプルコードのメモ
●example-gatt-serverをベースにBME280に対応させる
MEMSセンサのBME280は、温度、気圧、湿度が測れます。ラズパイとBME280ボードを次のように接続します。BME280はI2CとSPIのどちらのインターフェースでもつなげられますが、アマゾンで入手したボードはI2C専用になっていました。
次のプログラムで動作を確認しました。
import smbus, time, bme280 bus = smbus.SMBus(1) bme280.load_calibration_params(bus, 0x76) s = bme280.sample(bus,0x76) findString = 'temp=' t = str(s).find(findString) temp = str(s)[t+len(findString):t+len(findString)+4] print('temperature ',temp) findString = 'pressure=' t = str(s).find(findString) press = str(s)[t+len(findString):t+len(findString)+4] print('pressure ', press) findString = 'humidity=' t = str(s).find(findString) humi = str(s)[t+len(findString):t+len(findString)+4] print('humidity ', humi)
実行結果です。
●example-gatt-serverの利用したのは
サービスが何点か用意されています。その中でテスト用のTestServerを使いました。キャラはセキュアなどの事例は使わず、標準的なTestCharacteristic()を三つにし、bus番号を異なるようにしました。もともとあった、キャラの細かな設定用TestDescriptor、CharacteristicUserDescriptionDescriptorは捨てました。
16~40行がBME280関連です。
228~291行が修正した部分です。ほかは、もともとあったプログラムです。
import dbus import dbus.exceptions import dbus.mainloop.glib import dbus.service import array try: from gi.repository import GObject except ImportError: import gobject as GObject import sys from random import randint import smbus, bme280 bus = smbus.SMBus(1) bme280.load_calibration_params(bus, 0x76) temp = None press = None humi = None def bme280Read(): global temp global press global humi s = bme280.sample(bus,0x76) findString = 'temp=' t = str(s).find(findString) temp = str(s)[t+len(findString):t+len(findString)+4] print('temperature ',temp) findString = 'pressure=' t = str(s).find(findString) press = str(s)[t+len(findString):t+len(findString)+4] print('pressure ', press) findString = 'humidity=' t = str(s).find(findString) humi = str(s)[t+len(findString):t+len(findString)+4] print('humidity ', humi) mainloop = None BLUEZ_SERVICE_NAME = 'org.bluez' GATT_MANAGER_IFACE = 'org.bluez.GattManager1' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' GATT_SERVICE_IFACE = 'org.bluez.GattService1' GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' GATT_DESC_IFACE = 'org.bluez.GattDescriptor1' class InvalidArgsException(dbus.exceptions.DBusException): _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs' class NotSupportedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotSupported' class NotPermittedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.NotPermitted' class InvalidValueLengthException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.InvalidValueLength' class FailedException(dbus.exceptions.DBusException): _dbus_error_name = 'org.bluez.Error.Failed' class Application(dbus.service.Object): """ org.bluez.GattApplication1 interface implementation """ def __init__(self, bus): self.path = '/' self.services = [] dbus.service.Object.__init__(self, bus, self.path) self.add_service(TestService(bus, 0)) def get_path(self): return dbus.ObjectPath(self.path) def add_service(self, service): self.services.append(service) @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}') def GetManagedObjects(self): response = {} print('GetManagedObjects') for service in self.services: response[service.get_path()] = service.get_properties() chrcs = service.get_characteristics() for chrc in chrcs: response[chrc.get_path()] = chrc.get_properties() descs = chrc.get_descriptors() for desc in descs: response[desc.get_path()] = desc.get_properties() return response class Service(dbus.service.Object): """ org.bluez.GattService1 interface implementation """ PATH_BASE = '/org/bluez/example/service' def __init__(self, bus, index, uuid, primary): self.path = self.PATH_BASE + str(index) self.bus = bus self.uuid = uuid self.primary = primary self.characteristics = [] dbus.service.Object.__init__(self, bus, self.path) def get_properties(self): return { GATT_SERVICE_IFACE: { 'UUID': self.uuid, 'Primary': self.primary, 'Characteristics': dbus.Array( self.get_characteristic_paths(), signature='o') } } def get_path(self): return dbus.ObjectPath(self.path) def add_characteristic(self, characteristic): self.characteristics.append(characteristic) def get_characteristic_paths(self): result = [] for chrc in self.characteristics: result.append(chrc.get_path()) return result def get_characteristics(self): return self.characteristics @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): if interface != GATT_SERVICE_IFACE: raise InvalidArgsException() return self.get_properties()[GATT_SERVICE_IFACE] class Characteristic(dbus.service.Object): """ org.bluez.GattCharacteristic1 interface implementation """ def __init__(self, bus, index, uuid, flags, service): self.path = service.path + '/char' + str(index) self.bus = bus self.uuid = uuid self.service = service self.flags = flags self.descriptors = [] dbus.service.Object.__init__(self, bus, self.path) def get_properties(self): return { GATT_CHRC_IFACE: { 'Service': self.service.get_path(), 'UUID': self.uuid, 'Flags': self.flags, 'Descriptors': dbus.Array( self.get_descriptor_paths(), signature='o') } } def get_path(self): return dbus.ObjectPath(self.path) def add_descriptor(self, descriptor): self.descriptors.append(descriptor) def get_descriptor_paths(self): result = [] for desc in self.descriptors: result.append(desc.get_path()) return result def get_descriptors(self): return self.descriptors @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface): if interface != GATT_CHRC_IFACE: raise InvalidArgsException() return self.get_properties()[GATT_CHRC_IFACE] @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay') def ReadValue(self, options): print('Default ReadValue called, returning error') raise NotSupportedException() @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}') def WriteValue(self, value, options): print('Default WriteValue called, returning error') raise NotSupportedException() @dbus.service.method(GATT_CHRC_IFACE) def StartNotify(self): print('Default StartNotify called, returning error') raise NotSupportedException() @dbus.service.method(GATT_CHRC_IFACE) def StopNotify(self): print('Default StopNotify called, returning error') raise NotSupportedException() @dbus.service.signal(DBUS_PROP_IFACE, signature='sa{sv}as') def PropertiesChanged(self, interface, changed, invalidated): pass class TestService(Service): TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0' def __init__(self, bus, index): Service.__init__(self, bus, index, self.TEST_SVC_UUID, True) self.add_characteristic(TestCharacteristic(bus, 0, self)) self.add_characteristic(TestCharacteristic1(bus, 1, self)) self.add_characteristic(TestCharacteristic2(bus, 2, self)) class TestCharacteristic(Characteristic): """ Dummy test characteristic. Allows writing arbitrary bytes to its value, and contains "extended properties", as well as a test descriptor. """ TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.TEST_CHRC_UUID, ['read'], service) self.value = [5] def ReadValue(self, options): bme280Read() self.value = [float(temp)*10] print('TestCharacteristic Read: ' + repr(self.value)) return self.value class TestCharacteristic1(Characteristic): TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef7' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.TEST_CHRC_UUID, ['read'], service) self.value = [50] def ReadValue(self, options): bme280Read() self.value = [float(press)-1000] print('TestCharacteristic Read: ' + repr(self.value)) return self.value class TestCharacteristic2(Characteristic): TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef8' def __init__(self, bus, index, service): Characteristic.__init__( self, bus, index, self.TEST_CHRC_UUID, ['read'], service) self.value = [55] def ReadValue(self, options): bme280Read() self.value = [float(humi)] print('TestCharacteristic Read: ' + repr(self.value)) return self.value def register_app_cb(): print('GATT application registered') def register_app_error_cb(error): print('Failed to register application: ' + str(error)) mainloop.quit() def find_adapter(bus): remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) objects = remote_om.GetManagedObjects() for o, props in objects.items(): if GATT_MANAGER_IFACE in props.keys(): return o return None def main(): global mainloop dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() adapter = find_adapter(bus) if not adapter: print('GattManager1 interface not found') return service_manager = dbus.Interface( bus.get_object(BLUEZ_SERVICE_NAME, adapter), GATT_MANAGER_IFACE) app = Application(bus) mainloop = GObject.MainLoop() print('Registering GATT application...') service_manager.RegisterApplication(app.get_path(), {}, reply_handler=register_app_cb, error_handler=register_app_error_cb) mainloop.run() if __name__ == '__main__': main()
上記のGatt Serverのプログラムを動かし、