ラズパイで再生中の音量に合わせてインジケータを光らせよう。です。
前回はマイコンのアナログ入力でラズパイのライン出力をモニターしましたが、それだとノイズが乗ってノイズにインジケータが反応するのであまりうれしくない
なので、ラズパイ内でALSAでの再生中の音量を取得し、それをROS2トピック通信でマイコンに送り、マイコンを使ってインジケータを点灯させます。
前提条件;ラズパイ5+ラズパイOSbookwormにROS2 humbleが導入されている+Mbedのクロス開発環境ができている状態まで作成しておくこと
●目次
・構成の紹介
・Mbed側ソフトウエア
・ラズパイ側ソフトウエア
。配布
●構成
以下のようなことをやります。

●Mbed側ソフトウエア
●ラズパイ側ソフトウエア
まず、ご注意。
ここではあえてPIPを強引にインストールしており、今推奨のvenvは使っていません。
なぜか!・・・なんかROS2と相性悪いから。ぶっちゃけ、venvだとうまく動かんがpipを強引にインストールすれば動くんでそれでやっちゃってます
ステップ1)まず、音量をテキストで表示する。
・準備
sudo apt install python3 python3-pip
sudo apt install portaudio19-dev
「メモ」venvとros2の相性がなんかよくないので以下で強引にvenvなしでpipする手もある pip install pyaudio –break-system-packages とか pip install transitions –break-system-packages (ROS2でよく使うやつ)とか。
pip install pyaudio --break-system-packages
pip install numpy --break-system-packages
・プログラム本体(volume_display_py)
import pyaudio
import numpy as np
# PyAudioの設定
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 1024
# PyAudioのインスタンスを作成
audio = pyaudio.PyAudio()
# ストリームを開く
stream = audio.open(format=FORMAT, channels=CHANNELS,
rate=RATE, input=True,
frames_per_buffer=CHUNK)
print("音量を表示します。Ctrl+Cで終了します。")
try:
while True:
# 音声データを読み取る
data = np.frombuffer(stream.read(CHUNK), dtype=np.int16)
# 音量を計算
volume = np.linalg.norm(data) / CHUNK
print(f"音量: {volume:.2f}")
except KeyboardInterrupt:
print("終了します。")
# ストリームを閉じる
stream.stop_stream()
stream.close()
audio.terminate()
で、以下実行
python volume_display_py
ステップ2)ROS2の通信を組み込む
2-1-1)ノード作成
ワークスペース内で
ros2 pkg create vol_disp --build-type ament_python --dependencies rclpy
2-1-2)ワークスペース内/src/vol_disp/setup.py編集
from setuptools import setup
package_name = 'my_python_pkg'
from setuptools import find_packages, setup
package_name = 'vol_disp'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='pi',
maintainer_email='pi@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'voldisp = vol_disp.voldisp:main',
],
},
)
2-1-3)
ワークスペース内/src/vol_disp/vol_disp/voldisp.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile,ReliabilityPolicy,DurabilityPolicy
from mbed2rasppik_msgs.msg import Mbed2rasppik
from rasppi2mbedk_msgs.msg import Rasppi2mbedk
import time
import pyaudio
import numpy as np
import signal
import sys
# PyAudioの設定
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 1024
# PyAudioのインスタンスを作成
audio = pyaudio.PyAudio()
# ストリームを開く
stream = audio.open(format=FORMAT, channels=CHANNELS,
rate=RATE, input=True,
frames_per_buffer=CHUNK)
#----------クラス定義-----------
class Central_Controller(Node):
#コンストラクタ(初期定義)
def __init__(self):
super().__init__("input_control")
self.v_rx_cntr:int = 0
self.v_bluestat1:int = 0
self.v_bluestat_rpi1:int = 0
self.v_volumeamp_rpi:int = 0
self.v_volume:float= 0
self.v_volume_max:float= 0
#通信品質
profile = QoSProfile(depth=10, reliability=rclpy.qos.ReliabilityPolicy.BEST_EFFORT,\
durability=rclpy.qos.DurabilityPolicy.VOLATILE)
#送信設定
self.pub = self.create_publisher(Rasppi2mbedk,'rasppi2mbedk',profile)
#受信設定
self.sub = self.create_subscription(Mbed2rasppik,'mbed2rasppik',self.callback,profile)
print("音量を表示します。Ctrl+Cで終了します。")
#コールバック関数
def callback(self,msg):
msg_pub = Rasppi2mbedk()
#----------------mbed2rasppi購読値の読み込み
self.v_bluestat1 = msg.v_bluestat1
self.v_volumeamp_rpi = 100
#-------------------------演算
self.v_rx_cntr += 1
self.v_bluestat_rpi1 = self.v_bluestat1
# 音声データを読み取る
data = np.frombuffer(stream.read(CHUNK), dtype=np.int16)
# 音量を計算
self.v_volume = np.linalg.norm(data) / CHUNK
print(f"音量: {self.v_volume:.2f}")
self.v_volume_max = max(self.v_volume , self.v_volume_max)
print(f"最大音量: {self.v_volume_max:.2f}")
self.v_volumeamp_rpi = int(min(65535,self.v_volume*65))
#-------------rasppi2mbedへの発行データセット---------------------------
msg_pub.v_bluestat_rpi1 = self.v_bluestat_rpi1
msg_pub.v_volumeamplitude_rpi = self.v_volumeamp_rpi
#----------------コンソールへのデータモニタ------------------
print('rx_counter='+str(self.v_rx_cntr))
print('RX:v_bluestat1='+str(self.v_bluestat1))
print('TX:v_bluestat_rpi1='+str(self.v_bluestat_rpi1),'v_volumeamplitude_rpi='+str(self.v_volumeamp_rpi))
print('-------------------')
self.pub.publish(msg_pub)
""" メイン関数 """
def main():
#RCLの初期化を実行
rclpy.init()
#クラスをコンスタンス化
node = Central_Controller()
#-----CTRL+C時の処理を追加
def signal_handler(sig, frame):
print("CTRL+C 終了します。")
node.destroy_node()
rclpy.shutdown()
stream.stop_stream()
stream.close()
audio.terminate()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
try:
#ループに入りnodeの処理を実行させる
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
if rclpy.ok():
node.destroy_node()
rclpy.shutdown()
stream.stop_stream()
stream.close()
audio.terminate()
if __name__ == '__main__':
main()
2-2-1)ROS2TOPIC・カスタムメッセージファイル「mbed2rasppik」作成
ros2 pkg create --build-type ament_cmake mbed2rasppik
cd ~/ros2_ws/src/mbed2rasppik
mkdir msg
echo "int32 num" >
(ワークスペース内)/src/mbed2rasppik/Mbed2rasppikを以下のように編集
uint8 v_bluestat1
(ワークスペース内)/src/mbed2rasppik/CMakeLists.txt
の、find_packageのところを以下のように変更
find_package(rosidl_default_generators REQUIRED)
set(msg_files
"msg/Mbed2rasppik.msg"
)
rosidl_generate_interfaces(${PROJECT_NAME}
${msg_files}
)
(ワークスペース内)/src/mbed2rasppik/package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>mbed2rasppik_msgs</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="pi@todo.todo">pi</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>
2-3-1)ROS2TOPIC・カスタムメッセージファイル「rasppi2mbedk」作成
cd ~/ros2_ws
colcon build
ステップ3)自動起動
Raspberry Pi 5でRaspberry Pi OSを使用し、OS起動時にPythonで書かれたROS2ノードを複数起動する方法について説明します。
手順
- ROS2ノードの作成:
- 各ノードのPythonスクリプトを作成します。例えば、
node1.py
とnode2.py
というファイルを作成します。
- 各ノードのPythonスクリプトを作成します。例えば、
- システムサービスの作成:
- 各ノードのためにシステムサービスを作成します。以下は、
node1.service
の例です。
- 各ノードのためにシステムサービスを作成します。以下は、
[Unit]
Description=ROS2 Node 1
After=network.target
[Service]
ExecStart=/usr/bin/python3 /path/to/your/node1.py
Restart=always
User=pi
[Install]
WantedBy=multi-user.target
- 同様に、
node2.service
も作成します。
- サービスファイルの配置:
- 作成したサービスファイルを
/etc/systemd/system/
ディレクトリに配置します。
- 作成したサービスファイルを
sudo cp node1.service /etc/systemd/system/
sudo cp node2.service /etc/systemd/system/
4.サービスの有効化と起動:
- サービスを有効化し、起動します。
sudo systemctl enable node1.service
sudo systemctl enable node2.service
sudo systemctl start node1.service
sudo systemctl start node2.service
●配布
準備中