Raspberry Pi Zero Wで温湿度・気圧モニタリング

最終更新日

Raspberry Pi(ラズベリーパイ、ラズパイ)という小型コンピュータがあります。普通のパソコンと違うのは、小さくて安くて、モーターやセンサーに接続するための拡張が前提となっているところ。教育用途を狙って製造されたようです。岩場や山に出かけた際、温度や湿度をモニタリングできれば面白いなと思い、少し試してみました。深淵なるラズパイ沼に立ち入ってしまったかもしれません。

機材

予備知識

ブルーバックス「ラズパイ4対応 カラー図解 最新 Raspberry Piで学ぶ電子工作」の内容をひととおり読んで、Raspberry Pi 4 Bにキーボード、マウス、モニタに接続して使い、本に載っているサンプルを8割くらい真似て、Raspberry Piの基本をつかみました。

初期設定

Raspberry Pi 4はパソコン程の性能がある分、消費電力も大きいのでモバイル用途には向かなさそうなので、さらに小型なRaspberry Pi Zero Wを使用します。Raspberry Pi OSをmicro SDカードにコピーし、Wifi接続とSSHの初期設定を済ませて起動します。すると自宅のWifiにつながるので、MacのターミナルからSSH接続して各種設定を行います[1]外付けディスプレイなしでラズパイ4セットアップ

配線

Raspberry Pi Zero Wの電源をいったん切ってから、温湿度・気圧センサ、LCD表示版を取付けます。I2C接続を多用するので、配線が集中して、はんだづけが難しかったです。

どうやって配線したかよく覚えていませんが、SDA1とSCL1、あとは3.3Vの電源とGNDだったかと思います。

なお、Raspberry Pi ZeroにはWとWHの2種類があり、WHにはコネクタを直接させるピンヘッダがついているようなので、はんだづけせずに済むようです。ただ、ピンヘッダがない分、Raspberry Pi Zero Wの方がコンパクトです。

▲AE-BME280の説明書をよく読んで配線を確認
▲なんとか配線したラズパイZero Wとセンサ

制御ソフト

センサ類を制御して情報を保存、LCDに表示するためのプログラムをPythonで作成します。といっても、公開されている情報の切り貼りです[2]第39回「ラズベリーパイで温度・湿度・気圧をまとめて取得!AE-BME280でIC2通信」。センサ用の下準備が少々助長なので、モジュールに分割し、以下の構成にしました。

wloger.py ←本体
bme280.py ←温湿度・気圧センサ制御用
st7032.py ←LCD制御用

これら3つのファイルを同一ディレクトリに保存すると動作します。ここで、温湿度・気圧センサのアドレスは「76」、LCDのアドレスは「3e」です。

# -*- coding: utf-8 -*-
import bme280  # 自作モジュール呼び出し
import st7032  # 自作モジュール呼び出し
import datetime
import os
 
dir_path = '/home/pi/bme280-data'

now = datetime.datetime.now()
filename = now.strftime('%Y%m%d')
label = now.strftime('%H:%M')
csv = bme280.readData()
display_strings = str(csv).split(',')

# csvファイルに保存
if not os.path.exists('/home/pi/bme280-data'):
    os.makedirs('/home/pi/bme280-data')
f = open('/home/pi/bme280-data/'+filename+'.csv','a')
f.write("'"+label+"',"+csv+"\n")
f.close()

# LCDに温度と湿度を表示
st7032.write_string(display_strings[0]+'C')  # 気温
st7032.newline()
st7032.write_string(display_strings[1]+'%')  # 湿度
st7032.write_string(display_strings[2][-5:-2]) # 気圧(3文字のみ)
# -*- coding: utf-8 -*-
# BME280センサを取り扱う
 
import smbus
import time
 
bus_number  = 1
i2c_address = 0x76
 
bus = smbus.SMBus(bus_number)
 
digT = []
digP = []
digH = []
 
t_fine = 0.0

def writeReg(reg_address, data):
    bus.write_byte_data(i2c_address,reg_address,data)

def get_calib_param():
    calib = []
     
    for i in range (0x88,0x88+24):
        calib.append(bus.read_byte_data(i2c_address,i))
    calib.append(bus.read_byte_data(i2c_address,0xA1))
    for i in range (0xE1,0xE1+7):
        calib.append(bus.read_byte_data(i2c_address,i))
 
    digT.append((calib[1] << 8) | calib[0])
    digT.append((calib[3] << 8) | calib[2])
    digT.append((calib[5] << 8) | calib[4])
    digP.append((calib[7] << 8) | calib[6])
    digP.append((calib[9] << 8) | calib[8])
    digP.append((calib[11]<< 8) | calib[10])
    digP.append((calib[13]<< 8) | calib[12])
    digP.append((calib[15]<< 8) | calib[14])
    digP.append((calib[17]<< 8) | calib[16])
    digP.append((calib[19]<< 8) | calib[18])
    digP.append((calib[21]<< 8) | calib[20])
    digP.append((calib[23]<< 8) | calib[22])
    digH.append( calib[24] )
    digH.append((calib[26]<< 8) | calib[25])
    digH.append( calib[27] )
    digH.append((calib[28]<< 4) | (0x0F & calib[29]))
    digH.append((calib[30]<< 4) | ((calib[29] >> 4) & 0x0F))
    digH.append( calib[31] )
     
    for i in range(1,2):
        if digT[i] & 0x8000:
            digT[i] = (-digT[i] ^ 0xFFFF) + 1
 
    for i in range(1,8):
        if digP[i] & 0x8000:
            digP[i] = (-digP[i] ^ 0xFFFF) + 1
 
    for i in range(0,6):
        if digH[i] & 0x8000:
            digH[i] = (-digH[i] ^ 0xFFFF) + 1 

def readData():
    data = []
    for i in range (0xF7, 0xF7+8):
        data.append(bus.read_byte_data(i2c_address,i))
    pres_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
    temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
    hum_raw  = (data[6] << 8)  |  data[7]
     
    #compensate_T(temp_raw)
    #compensate_P(pres_raw)
    #compensate_H(hum_raw)
    t = compensate_T(temp_raw)
    h = compensate_H(hum_raw)
    p = compensate_P(pres_raw)
    return t + "," + h + "," + p

def compensate_P(adc_P):
    global  t_fine
    pressure = 0.0
     
    v1 = (t_fine / 2.0) - 64000.0
    v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * digP[5]
    v2 = v2 + ((v1 * digP[4]) * 2.0)
    v2 = (v2 / 4.0) + (digP[3] * 65536.0)
    v1 = (((digP[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8)  + ((digP[1] * v1) / 2.0)) / 262144
    v1 = ((32768 + v1) * digP[0]) / 32768
     
    if v1 == 0:
        return 0
    pressure = ((1048576 - adc_P) - (v2 / 4096)) * 3125
    if pressure < 0x80000000:
        pressure = (pressure * 2.0) / v1
    else:
        pressure = (pressure / v1) * 2
    v1 = (digP[8] * (((pressure / 8.0) * (pressure / 8.0)) / 8192.0)) / 4096
    v2 = ((pressure / 4.0) * digP[7]) / 8192.0
    pressure = pressure + ((v1 + v2 + digP[6]) / 16.0)  
 
    #print "pressure : %7.2f hPa" % (pressure/100)
    return "%.1f" % (pressure/100)

def compensate_T(adc_T):
    global t_fine
    v1 = (adc_T / 16384.0 - digT[0] / 1024.0) * digT[1]
    v2 = (adc_T / 131072.0 - digT[0] / 8192.0) * (adc_T / 131072.0 - digT[0] / 8192.0) * digT[2]
    t_fine = v1 + v2
    temperature = t_fine / 5120.0
    #print "temp : %-6.2f ℃" % (temperature) 
    return "%.1f" % (temperature) 

def compensate_H(adc_H):
    global t_fine
    var_h = t_fine - 76800.0
    if var_h != 0:
        var_h = (adc_H - (digH[3] * 64.0 + digH[4]/16384.0 * var_h)) * (digH[1] / 65536.0 * (1.0 + digH[5] / 67108864.0 * var_h * (1.0 + digH[2] / 67108864.0 * var_h)))
    else:
        return 0
    var_h = var_h * (1.0 - digH[0] * var_h / 524288.0)
    if var_h > 100.0:
        var_h = 100.0
    elif var_h < 0.0:
        var_h = 0.0
    #print "hum : %6.2f %" % (var_h)
    return "%.1f" % (var_h)

def setup():
    osrs_t = 1            #Temperature oversampling x 1
    osrs_p = 1            #Pressure oversampling x 1
    osrs_h = 1            #Humidity oversampling x 1
    mode   = 3            #Normal mode
    t_sb   = 5            #Tstandby 1000ms
    filter = 0            #Filter off
    spi3w_en = 0            #3-wire SPI Disable
 
    ctrl_meas_reg = (osrs_t << 5) | (osrs_p << 2) | mode
    config_reg    = (t_sb << 5) | (filter << 2) | spi3w_en
    ctrl_hum_reg  = osrs_h
 
    writeReg(0xF2,ctrl_hum_reg)
    writeReg(0xF4,ctrl_meas_reg)
    writeReg(0xF5,config_reg)

setup()
get_calib_param()

if __name__ == '__main__':
    try:
        readData()
    except KeyboardInterrupt:
        pass
# -*- coding: utf-8 -*-
# ST7032のLCDを取り扱う

import smbus
import sys
from time import sleep

def setup_st7032():
    trials = 5
    for i in range(trials):
        try:
            c_lower = (contrast & 0xf)
            c_upper = (contrast & 0x30)>>4
            bus.write_i2c_block_data(address_st7032, register_setting, [0x38, 0x39, 0x14, 0x70|c_lower, 0x54|c_upper, 0x6c])
            sleep(0.2)
            bus.write_i2c_block_data(address_st7032, register_setting, [0x38, 0x0d, 0x01])
            sleep(0.001)
            break
        except IOError:
            if i==trials-1:
                sys.exit()

def clear():
    global position
    global line
    position = 0
    line = 0
    bus.write_byte_data(address_st7032, register_setting, 0x01)
    sleep(0.001)

def newline():
    global position
    global line
    if line == display_lines-1:
        clear()
    else:
        line += 1
        position = chars_per_line*line
        bus.write_byte_data(address_st7032, register_setting, 0xc0)
        sleep(0.001)

def write_string(s):
    for c in list(s):
        write_char(ord(c))

def write_char(c):
    global position
    byte_data = check_writable(c)
    if position == display_chars:
        clear()
    elif position == chars_per_line*(line+1):
        newline()
    bus.write_byte_data(address_st7032, register_display, byte_data)
    position += 1

def check_writable(c):
    if c >= 0x06 and c <= 0xff :
        return c
    else:
        return 0x20 # 空白文字

bus = smbus.SMBus(1)
address_st7032 = 0x3e
register_setting = 0x00
register_display = 0x40

contrast = 32 # 0から63のコントラスト。30から40程度を推奨
chars_per_line = 8  # LCDの横方向の文字数
display_lines = 2   # LCDの行数

display_chars = chars_per_line*display_lines

position = 0
line = 0

setup_st7032()

if __name__ == '__main__':
    if len(sys.argv)==1:
        # アルファベットと記号は「''」でくくってそのまま表示可能
        write_string('Hello')
    else:
        write_string(sys.argv[1])

定期的に自動実行

wloger.pyを実行すると、温度、湿度、気圧を取得してディスプレイに表示しつつ、csvファイルに保存できるようになりました。一定時間間隔で自動実行するよう、cronの設定をします。

$ sudo crontab -e

でcrontabを編集し、最後の行に

*/5 * * * * python3 /home/pi/wloger.py

と追記します。冒頭の「*/5」が5分毎にという指定です。これで、ラズパイZero Wが起動している間、5分毎にwloger.pyが実行され、csvファイルにデータが追記されます。

▲適当なケースに梱包して完成。通気孔を開けています
▲保存されたcsvファイルをグラフ化するテスト(ラズパイ内部でmatplotlibを動かして画像化する方法を検討しましたが、処理が重いので不採用)