Raspberry Pi Zero Wで温湿度・気圧モニタリング
Raspberry Pi(ラズベリーパイ、ラズパイ)という小型コンピュータがあります。普通のパソコンと違うのは、小さくて安くて、モーターやセンサーに接続するための拡張が前提となっているところ。教育用途を狙って製造されたようです。岩場や山に出かけた際、温度や湿度をモニタリングできれば面白いなと思い、少し試してみました。深淵なるラズパイ沼に立ち入ってしまったかもしれません。
機材
- Raspberry Pi Zero W
- 温湿度・気圧センサ AE-BME280
- AQM0802使用LCDモジュール
- モバイルバッテリー
- ケースやコネクタ類
予備知識
ブルーバックス「ラズパイ4対応 カラー図解 最新 Raspberry Piで学ぶ電子工作」の内容をひととおり読んで、Raspberry Pi 4 Bにキーボード、マウス、モニタに接続して使い、本に載っているサンプルを8割くらい真似て、Raspberry Piの基本をつかみました。
初期設定
Raspberry Pi 4はパソコン程の性能がある分、消費電力も大きいのでモバイル用途には向かなさそうなので、さらに小型なRaspberry Pi Zero Wを使用します。Raspberry Pi OSをmicro SDカードにコピーし、Wifi接続とSSHの初期設定を済ませて起動します。すると自宅のWifiにつながるので、MacのターミナルからSSH接続して各種設定を行います[1]外付けディスプレイなしでラズパイ4セットアップ。
配線
Raspberry Pi Zero Wの電源をいったん切ってから、温湿度・気圧センサ、LCD表示版を取付けます。I2C接続を多用するので、配線が集中して、はんだづけが難しかったです。
どうやって配線したかよく覚えていませんが、SDA1とSCL1、あとは3.3Vの電源とGNDだったかと思います。
なお、Raspberry Pi ZeroにはWとWHの2種類があり、WHにはコネクタを直接させるピンヘッダがついているようなので、はんだづけせずに済むようです。ただ、ピンヘッダがない分、Raspberry Pi Zero Wの方がコンパクトです。
制御ソフト
センサ類を制御して情報を保存、LCDに表示するためのプログラムをPythonで作成します。といっても、公開されている情報の切り貼りです[2]第39回「ラズベリーパイで温度・湿度・気圧をまとめて取得!AE-BME280でIC2通信」。センサ用の下準備が少々助長なので、モジュールに分割し、以下の構成にしました。
wloger.py ←本体
bme280.py ←温湿度・気圧センサ制御用
st7032.py ←LCD制御用
これら3つのファイルを同一ディレクトリに保存すると動作します。ここで、温湿度・気圧センサのアドレスは「76」、LCDのアドレスは「3e」です。
# -*- coding: utf-8 -*-
import bme280 # 自作モジュール呼び出し
import st7032 # 自作モジュール呼び出し
import datetime
import os
dir_path = '/home/pi/bme280-data'
now = datetime.datetime.now()
filename = now.strftime('%Y%m%d')
label = now.strftime('%H:%M')
csv = bme280.readData()
display_strings = str(csv).split(',')
# csvファイルに保存
if not os.path.exists('/home/pi/bme280-data'):
os.makedirs('/home/pi/bme280-data')
f = open('/home/pi/bme280-data/'+filename+'.csv','a')
f.write("'"+label+"',"+csv+"\n")
f.close()
# LCDに温度と湿度を表示
st7032.write_string(display_strings[0]+'C') # 気温
st7032.newline()
st7032.write_string(display_strings[1]+'%') # 湿度
st7032.write_string(display_strings[2][-5:-2]) # 気圧(3文字のみ)
# -*- coding: utf-8 -*-
# BME280センサを取り扱う
import smbus
import time
bus_number = 1
i2c_address = 0x76
bus = smbus.SMBus(bus_number)
digT = []
digP = []
digH = []
t_fine = 0.0
def writeReg(reg_address, data):
bus.write_byte_data(i2c_address,reg_address,data)
def get_calib_param():
calib = []
for i in range (0x88,0x88+24):
calib.append(bus.read_byte_data(i2c_address,i))
calib.append(bus.read_byte_data(i2c_address,0xA1))
for i in range (0xE1,0xE1+7):
calib.append(bus.read_byte_data(i2c_address,i))
digT.append((calib[1] << 8) | calib[0])
digT.append((calib[3] << 8) | calib[2])
digT.append((calib[5] << 8) | calib[4])
digP.append((calib[7] << 8) | calib[6])
digP.append((calib[9] << 8) | calib[8])
digP.append((calib[11]<< 8) | calib[10])
digP.append((calib[13]<< 8) | calib[12])
digP.append((calib[15]<< 8) | calib[14])
digP.append((calib[17]<< 8) | calib[16])
digP.append((calib[19]<< 8) | calib[18])
digP.append((calib[21]<< 8) | calib[20])
digP.append((calib[23]<< 8) | calib[22])
digH.append( calib[24] )
digH.append((calib[26]<< 8) | calib[25])
digH.append( calib[27] )
digH.append((calib[28]<< 4) | (0x0F & calib[29]))
digH.append((calib[30]<< 4) | ((calib[29] >> 4) & 0x0F))
digH.append( calib[31] )
for i in range(1,2):
if digT[i] & 0x8000:
digT[i] = (-digT[i] ^ 0xFFFF) + 1
for i in range(1,8):
if digP[i] & 0x8000:
digP[i] = (-digP[i] ^ 0xFFFF) + 1
for i in range(0,6):
if digH[i] & 0x8000:
digH[i] = (-digH[i] ^ 0xFFFF) + 1
def readData():
data = []
for i in range (0xF7, 0xF7+8):
data.append(bus.read_byte_data(i2c_address,i))
pres_raw = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
temp_raw = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
hum_raw = (data[6] << 8) | data[7]
#compensate_T(temp_raw)
#compensate_P(pres_raw)
#compensate_H(hum_raw)
t = compensate_T(temp_raw)
h = compensate_H(hum_raw)
p = compensate_P(pres_raw)
return t + "," + h + "," + p
def compensate_P(adc_P):
global t_fine
pressure = 0.0
v1 = (t_fine / 2.0) - 64000.0
v2 = (((v1 / 4.0) * (v1 / 4.0)) / 2048) * digP[5]
v2 = v2 + ((v1 * digP[4]) * 2.0)
v2 = (v2 / 4.0) + (digP[3] * 65536.0)
v1 = (((digP[2] * (((v1 / 4.0) * (v1 / 4.0)) / 8192)) / 8) + ((digP[1] * v1) / 2.0)) / 262144
v1 = ((32768 + v1) * digP[0]) / 32768
if v1 == 0:
return 0
pressure = ((1048576 - adc_P) - (v2 / 4096)) * 3125
if pressure < 0x80000000:
pressure = (pressure * 2.0) / v1
else:
pressure = (pressure / v1) * 2
v1 = (digP[8] * (((pressure / 8.0) * (pressure / 8.0)) / 8192.0)) / 4096
v2 = ((pressure / 4.0) * digP[7]) / 8192.0
pressure = pressure + ((v1 + v2 + digP[6]) / 16.0)
#print "pressure : %7.2f hPa" % (pressure/100)
return "%.1f" % (pressure/100)
def compensate_T(adc_T):
global t_fine
v1 = (adc_T / 16384.0 - digT[0] / 1024.0) * digT[1]
v2 = (adc_T / 131072.0 - digT[0] / 8192.0) * (adc_T / 131072.0 - digT[0] / 8192.0) * digT[2]
t_fine = v1 + v2
temperature = t_fine / 5120.0
#print "temp : %-6.2f ℃" % (temperature)
return "%.1f" % (temperature)
def compensate_H(adc_H):
global t_fine
var_h = t_fine - 76800.0
if var_h != 0:
var_h = (adc_H - (digH[3] * 64.0 + digH[4]/16384.0 * var_h)) * (digH[1] / 65536.0 * (1.0 + digH[5] / 67108864.0 * var_h * (1.0 + digH[2] / 67108864.0 * var_h)))
else:
return 0
var_h = var_h * (1.0 - digH[0] * var_h / 524288.0)
if var_h > 100.0:
var_h = 100.0
elif var_h < 0.0:
var_h = 0.0
#print "hum : %6.2f %" % (var_h)
return "%.1f" % (var_h)
def setup():
osrs_t = 1 #Temperature oversampling x 1
osrs_p = 1 #Pressure oversampling x 1
osrs_h = 1 #Humidity oversampling x 1
mode = 3 #Normal mode
t_sb = 5 #Tstandby 1000ms
filter = 0 #Filter off
spi3w_en = 0 #3-wire SPI Disable
ctrl_meas_reg = (osrs_t << 5) | (osrs_p << 2) | mode
config_reg = (t_sb << 5) | (filter << 2) | spi3w_en
ctrl_hum_reg = osrs_h
writeReg(0xF2,ctrl_hum_reg)
writeReg(0xF4,ctrl_meas_reg)
writeReg(0xF5,config_reg)
setup()
get_calib_param()
if __name__ == '__main__':
try:
readData()
except KeyboardInterrupt:
pass
# -*- coding: utf-8 -*-
# ST7032のLCDを取り扱う
import smbus
import sys
from time import sleep
def setup_st7032():
trials = 5
for i in range(trials):
try:
c_lower = (contrast & 0xf)
c_upper = (contrast & 0x30)>>4
bus.write_i2c_block_data(address_st7032, register_setting, [0x38, 0x39, 0x14, 0x70|c_lower, 0x54|c_upper, 0x6c])
sleep(0.2)
bus.write_i2c_block_data(address_st7032, register_setting, [0x38, 0x0d, 0x01])
sleep(0.001)
break
except IOError:
if i==trials-1:
sys.exit()
def clear():
global position
global line
position = 0
line = 0
bus.write_byte_data(address_st7032, register_setting, 0x01)
sleep(0.001)
def newline():
global position
global line
if line == display_lines-1:
clear()
else:
line += 1
position = chars_per_line*line
bus.write_byte_data(address_st7032, register_setting, 0xc0)
sleep(0.001)
def write_string(s):
for c in list(s):
write_char(ord(c))
def write_char(c):
global position
byte_data = check_writable(c)
if position == display_chars:
clear()
elif position == chars_per_line*(line+1):
newline()
bus.write_byte_data(address_st7032, register_display, byte_data)
position += 1
def check_writable(c):
if c >= 0x06 and c <= 0xff :
return c
else:
return 0x20 # 空白文字
bus = smbus.SMBus(1)
address_st7032 = 0x3e
register_setting = 0x00
register_display = 0x40
contrast = 32 # 0から63のコントラスト。30から40程度を推奨
chars_per_line = 8 # LCDの横方向の文字数
display_lines = 2 # LCDの行数
display_chars = chars_per_line*display_lines
position = 0
line = 0
setup_st7032()
if __name__ == '__main__':
if len(sys.argv)==1:
# アルファベットと記号は「''」でくくってそのまま表示可能
write_string('Hello')
else:
write_string(sys.argv[1])
定期的に自動実行
wloger.pyを実行すると、温度、湿度、気圧を取得してディスプレイに表示しつつ、csvファイルに保存できるようになりました。一定時間間隔で自動実行するよう、cronの設定をします。
$ sudo crontab -e
でcrontabを編集し、最後の行に
*/5 * * * * python3 /home/pi/wloger.py
と追記します。冒頭の「*/5」が5分毎にという指定です。これで、ラズパイZero Wが起動している間、5分毎にwloger.pyが実行され、csvファイルにデータが追記されます。
References