初めてのBLE (11) Raspberry Piでペリフェラル③BME280に対応
前回、example-gatt-serverのBattery Serviceを一部変更して、ラズパイのCPUの温度を送る変更をしました。ここでは、不要な部分を取り除いてスリムにします。
ラズパイで動いているBlueZを使うには、Bluetooth protocol stack for Linuxのサンプルを利用します。
Bluetooth protocol stack for Linuxのtestディレクトリ
(参考文献)BlueZのAPI/サンプルコードのメモ
●example-gatt-serverをベースにBME280に対応させる
MEMSセンサのBME280は、温度、気圧、湿度が測れます。ラズパイとBME280ボードを次のように接続します。BME280はI2CとSPIのどちらのインターフェースでもつなげられますが、アマゾンで入手したボードはI2C専用になっていました。
次のプログラムで動作を確認しました。
import smbus, time, bme280
bus = smbus.SMBus(1)
bme280.load_calibration_params(bus, 0x76)
s = bme280.sample(bus,0x76)
findString = 'temp='
t = str(s).find(findString)
temp = str(s)[t+len(findString):t+len(findString)+4]
print('temperature ',temp)
findString = 'pressure='
t = str(s).find(findString)
press = str(s)[t+len(findString):t+len(findString)+4]
print('pressure ', press)
findString = 'humidity='
t = str(s).find(findString)
humi = str(s)[t+len(findString):t+len(findString)+4]
print('humidity ', humi)
実行結果です。
●example-gatt-serverの利用したのは
サービスが何点か用意されています。その中でテスト用のTestServerを使いました。キャラはセキュアなどの事例は使わず、標準的なTestCharacteristic()を三つにし、bus番号を異なるようにしました。もともとあった、キャラの細かな設定用TestDescriptor、CharacteristicUserDescriptionDescriptorは捨てました。
16~40行がBME280関連です。
228~291行が修正した部分です。ほかは、もともとあったプログラムです。
import dbus
import dbus.exceptions
import dbus.mainloop.glib
import dbus.service
import array
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
import sys
from random import randint
import smbus, bme280
bus = smbus.SMBus(1)
bme280.load_calibration_params(bus, 0x76)
temp = None
press = None
humi = None
def bme280Read():
global temp
global press
global humi
s = bme280.sample(bus,0x76)
findString = 'temp='
t = str(s).find(findString)
temp = str(s)[t+len(findString):t+len(findString)+4]
print('temperature ',temp)
findString = 'pressure='
t = str(s).find(findString)
press = str(s)[t+len(findString):t+len(findString)+4]
print('pressure ', press)
findString = 'humidity='
t = str(s).find(findString)
humi = str(s)[t+len(findString):t+len(findString)+4]
print('humidity ', humi)
mainloop = None
BLUEZ_SERVICE_NAME = 'org.bluez'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class NotSupportedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotSupported'
class NotPermittedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.NotPermitted'
class InvalidValueLengthException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.InvalidValueLength'
class FailedException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.bluez.Error.Failed'
class Application(dbus.service.Object):
"""
org.bluez.GattApplication1 interface implementation
"""
def __init__(self, bus):
self.path = '/'
self.services = []
dbus.service.Object.__init__(self, bus, self.path)
self.add_service(TestService(bus, 0))
def get_path(self):
return dbus.ObjectPath(self.path)
def add_service(self, service):
self.services.append(service)
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
response = {}
print('GetManagedObjects')
for service in self.services:
response[service.get_path()] = service.get_properties()
chrcs = service.get_characteristics()
for chrc in chrcs:
response[chrc.get_path()] = chrc.get_properties()
descs = chrc.get_descriptors()
for desc in descs:
response[desc.get_path()] = desc.get_properties()
return response
class Service(dbus.service.Object):
"""
org.bluez.GattService1 interface implementation
"""
PATH_BASE = '/org/bluez/example/service'
def __init__(self, bus, index, uuid, primary):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.uuid = uuid
self.primary = primary
self.characteristics = []
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_SERVICE_IFACE: {
'UUID': self.uuid,
'Primary': self.primary,
'Characteristics': dbus.Array(
self.get_characteristic_paths(),
signature='o')
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_characteristic(self, characteristic):
self.characteristics.append(characteristic)
def get_characteristic_paths(self):
result = []
for chrc in self.characteristics:
result.append(chrc.get_path())
return result
def get_characteristics(self):
return self.characteristics
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
if interface != GATT_SERVICE_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_SERVICE_IFACE]
class Characteristic(dbus.service.Object):
"""
org.bluez.GattCharacteristic1 interface implementation
"""
def __init__(self, bus, index, uuid, flags, service):
self.path = service.path + '/char' + str(index)
self.bus = bus
self.uuid = uuid
self.service = service
self.flags = flags
self.descriptors = []
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_CHRC_IFACE: {
'Service': self.service.get_path(),
'UUID': self.uuid,
'Flags': self.flags,
'Descriptors': dbus.Array(
self.get_descriptor_paths(),
signature='o')
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
def add_descriptor(self, descriptor):
self.descriptors.append(descriptor)
def get_descriptor_paths(self):
result = []
for desc in self.descriptors:
result.append(desc.get_path())
return result
def get_descriptors(self):
return self.descriptors
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
if interface != GATT_CHRC_IFACE:
raise InvalidArgsException()
return self.get_properties()[GATT_CHRC_IFACE]
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='a{sv}',
out_signature='ay')
def ReadValue(self, options):
print('Default ReadValue called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
def WriteValue(self, value, options):
print('Default WriteValue called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE)
def StartNotify(self):
print('Default StartNotify called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE)
def StopNotify(self):
print('Default StopNotify called, returning error')
raise NotSupportedException()
@dbus.service.signal(DBUS_PROP_IFACE,
signature='sa{sv}as')
def PropertiesChanged(self, interface, changed, invalidated):
pass
class TestService(Service):
TEST_SVC_UUID = '12345678-1234-5678-1234-56789abcdef0'
def __init__(self, bus, index):
Service.__init__(self, bus, index, self.TEST_SVC_UUID, True)
self.add_characteristic(TestCharacteristic(bus, 0, self))
self.add_characteristic(TestCharacteristic1(bus, 1, self))
self.add_characteristic(TestCharacteristic2(bus, 2, self))
class TestCharacteristic(Characteristic):
"""
Dummy test characteristic. Allows writing arbitrary bytes to its value, and
contains "extended properties", as well as a test descriptor.
"""
TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef1'
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.TEST_CHRC_UUID,
['read'],
service)
self.value = [5]
def ReadValue(self, options):
bme280Read()
self.value = [float(temp)*10]
print('TestCharacteristic Read: ' + repr(self.value))
return self.value
class TestCharacteristic1(Characteristic):
TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef7'
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.TEST_CHRC_UUID,
['read'],
service)
self.value = [50]
def ReadValue(self, options):
bme280Read()
self.value = [float(press)-1000]
print('TestCharacteristic Read: ' + repr(self.value))
return self.value
class TestCharacteristic2(Characteristic):
TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef8'
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.TEST_CHRC_UUID,
['read'],
service)
self.value = [55]
def ReadValue(self, options):
bme280Read()
self.value = [float(humi)]
print('TestCharacteristic Read: ' + repr(self.value))
return self.value
def register_app_cb():
print('GATT application registered')
def register_app_error_cb(error):
print('Failed to register application: ' + str(error))
mainloop.quit()
def find_adapter(bus):
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
DBUS_OM_IFACE)
objects = remote_om.GetManagedObjects()
for o, props in objects.items():
if GATT_MANAGER_IFACE in props.keys():
return o
return None
def main():
global mainloop
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
adapter = find_adapter(bus)
if not adapter:
print('GattManager1 interface not found')
return
service_manager = dbus.Interface(
bus.get_object(BLUEZ_SERVICE_NAME, adapter),
GATT_MANAGER_IFACE)
app = Application(bus)
mainloop = GObject.MainLoop()
print('Registering GATT application...')
service_manager.RegisterApplication(app.get_path(), {},
reply_handler=register_app_cb,
error_handler=register_app_error_cb)
mainloop.run()
if __name__ == '__main__':
main()
上記のGatt Serverのプログラムを動かし、