ラズパイで再生中の音量に合わせてインジケータを光らせよう。です。

前回はマイコンのアナログ入力でラズパイのライン出力をモニターしましたが、それだとノイズが乗ってノイズにインジケータが反応するのであまりうれしくない

なので、ラズパイ内でALSAでの再生中の音量を取得し、それをROS2トピック通信でマイコンに送り、マイコンを使ってインジケータを点灯させます。

前提条件;ラズパイ5+ラズパイOSbookwormにROS2 humbleが導入されているMbedのクロス開発環境できている状態まで作成しておくこと

●目次

・構成の紹介

・Mbed側ソフトウエア

・ラズパイ側ソフトウエア

。配布

●構成

以下のようなことをやります。

●Mbed側ソフトウエア

●ラズパイ側ソフトウエア

まず、ご注意。

ここではあえてPIPを強引にインストールしており、今推奨のvenvは使っていません。

なぜか!・・・なんかROS2と相性悪いから。ぶっちゃけ、venvだとうまく動かんがpipを強引にインストールすれば動くんでそれでやっちゃってます

ステップ1)まず、音量をテキストで表示する。

・準備

sudo apt install python3 python3-pip
sudo apt install portaudio19-dev

「メモ」venvとros2の相性がなんかよくないので以下で強引にvenvなしでpipする手もある pip install pyaudio –break-system-packages とか pip install transitions –break-system-packages (ROS2でよく使うやつ)とか。

pip install pyaudio --break-system-packages
pip install numpy --break-system-packages

・プログラム本体(volume_display_py)

import pyaudio
import numpy as np

# PyAudioの設定
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 1024

# PyAudioのインスタンスを作成
audio = pyaudio.PyAudio()

# ストリームを開く
stream = audio.open(format=FORMAT, channels=CHANNELS,
                    rate=RATE, input=True,
                    frames_per_buffer=CHUNK)

print("音量を表示します。Ctrl+Cで終了します。")

try:
    while True:
        # 音声データを読み取る
        data = np.frombuffer(stream.read(CHUNK), dtype=np.int16)
        # 音量を計算
        volume = np.linalg.norm(data) / CHUNK
        print(f"音量: {volume:.2f}")
except KeyboardInterrupt:
    print("終了します。")

# ストリームを閉じる
stream.stop_stream()
stream.close()
audio.terminate()

で、以下実行

python volume_display_py

ステップ2)ROS2の通信を組み込む

2-1-1)ノード作成

ワークスペース内で

ros2 pkg create vol_disp --build-type ament_python --dependencies rclpy

2-1-2)ワークスペース内/src/vol_disp/setup.py編集

from setuptools import setup

package_name = 'my_python_pkg'

from setuptools import find_packages, setup

package_name = 'vol_disp'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='pi',
    maintainer_email='pi@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'voldisp = vol_disp.voldisp:main',
        ],
    },
)

2-1-3)

ワークスペース内/src/vol_disp/vol_disp/voldisp.py

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile,ReliabilityPolicy,DurabilityPolicy
from mbed2rasppik_msgs.msg import Mbed2rasppik
from rasppi2mbedk_msgs.msg import Rasppi2mbedk

import time

import pyaudio
import numpy as np

import signal
import sys

# PyAudioの設定
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 1024

# PyAudioのインスタンスを作成
audio = pyaudio.PyAudio()

# ストリームを開く
stream = audio.open(format=FORMAT, channels=CHANNELS,
                    rate=RATE, input=True,
                    frames_per_buffer=CHUNK)


#----------クラス定義-----------
class Central_Controller(Node):
    
    #コンストラクタ(初期定義)
    def __init__(self):
        
        super().__init__("input_control")
        
        self.v_rx_cntr:int = 0
        self.v_bluestat1:int = 0
        self.v_bluestat_rpi1:int = 0
        self.v_volumeamp_rpi:int = 0

        self.v_volume:float= 0
        self.v_volume_max:float= 0

        #通信品質
        profile = QoSProfile(depth=10, reliability=rclpy.qos.ReliabilityPolicy.BEST_EFFORT,\
                    durability=rclpy.qos.DurabilityPolicy.VOLATILE)
        
        #送信設定
        self.pub = self.create_publisher(Rasppi2mbedk,'rasppi2mbedk',profile)
        
        #受信設定
        self.sub = self.create_subscription(Mbed2rasppik,'mbed2rasppik',self.callback,profile) 

        print("音量を表示します。Ctrl+Cで終了します。")


    #コールバック関数
    def callback(self,msg):
       
        msg_pub = Rasppi2mbedk() 
        #----------------mbed2rasppi購読値の読み込み
        self.v_bluestat1 = msg.v_bluestat1
        self.v_volumeamp_rpi = 100

        #-------------------------演算
        self.v_rx_cntr += 1
        self.v_bluestat_rpi1 = self.v_bluestat1

        # 音声データを読み取る
        data = np.frombuffer(stream.read(CHUNK), dtype=np.int16)
        # 音量を計算
        self.v_volume = np.linalg.norm(data) / CHUNK
        print(f"音量: {self.v_volume:.2f}")
        self.v_volume_max = max(self.v_volume , self.v_volume_max)
        print(f"最大音量: {self.v_volume_max:.2f}")
        self.v_volumeamp_rpi = int(min(65535,self.v_volume*65))

        #-------------rasppi2mbedへの発行データセット---------------------------       
        msg_pub.v_bluestat_rpi1 = self.v_bluestat_rpi1
        msg_pub.v_volumeamplitude_rpi = self.v_volumeamp_rpi

        #----------------コンソールへのデータモニタ------------------
        print('rx_counter='+str(self.v_rx_cntr))
        print('RX:v_bluestat1='+str(self.v_bluestat1))
        print('TX:v_bluestat_rpi1='+str(self.v_bluestat_rpi1),'v_volumeamplitude_rpi='+str(self.v_volumeamp_rpi))
        print('-------------------')
                    
        self.pub.publish(msg_pub)
        
""" メイン関数 """
def main():
    
    #RCLの初期化を実行
    rclpy.init()
    
    #クラスをコンスタンス化
    node = Central_Controller()

    #-----CTRL+C時の処理を追加
    def signal_handler(sig, frame):
        print("CTRL+C 終了します。")
        node.destroy_node()
        rclpy.shutdown()
        stream.stop_stream()
        stream.close()
        audio.terminate()
        sys.exit(0)

    signal.signal(signal.SIGINT, signal_handler)
    
    try:
        #ループに入りnodeの処理を実行させる
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        if rclpy.ok():
            node.destroy_node()
            rclpy.shutdown()
        stream.stop_stream()
        stream.close()
        audio.terminate()
    
if __name__ == '__main__':
    main()

2-2-1)ROS2TOPIC・カスタムメッセージファイル「mbed2rasppik」作成

ros2 pkg create --build-type ament_cmake mbed2rasppik
cd ~/ros2_ws/src/mbed2rasppik
mkdir msg
echo "int32 num" > 

(ワークスペース内)/src/mbed2rasppik/Mbed2rasppikを以下のように編集

uint8 v_bluestat1

(ワークスペース内)/src/mbed2rasppik/CMakeLists.txt

の、find_packageのところを以下のように変更

find_package(rosidl_default_generators REQUIRED)

set(msg_files
  "msg/Mbed2rasppik.msg"
)
rosidl_generate_interfaces(${PROJECT_NAME}
  ${msg_files}
)

(ワークスペース内)/src/mbed2rasppik/package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>mbed2rasppik_msgs</name>
  <version>0.0.0</version>
  <description>TODO: Package description</description>
  <maintainer email="pi@todo.todo">pi</maintainer>
  <license>TODO: License declaration</license>

  <buildtool_depend>ament_cmake</buildtool_depend>

  <test_depend>ament_lint_auto</test_depend>
  <test_depend>ament_lint_common</test_depend>

  <build_depend>rosidl_default_generators</build_depend>
  <exec_depend>rosidl_default_runtime</exec_depend>
  <member_of_group>rosidl_interface_packages</member_of_group>

  <export>
    <build_type>ament_cmake</build_type>
  </export>
</package>

2-3-1)ROS2TOPIC・カスタムメッセージファイル「rasppi2mbedk」作成

cd ~/ros2_ws
colcon build

ステップ3)自動起動

Raspberry Pi 5でRaspberry Pi OSを使用し、OS起動時にPythonで書かれたROS2ノードを複数起動する方法について説明します。

手順

  1. ROS2ノードの作成:
    • 各ノードのPythonスクリプトを作成します。例えば、node1.pynode2.pyというファイルを作成します。
  2. システムサービスの作成:
    • 各ノードのためにシステムサービスを作成します。以下は、node1.serviceの例です。
[Unit]
Description=ROS2 Node 1
After=network.target

[Service]
ExecStart=/usr/bin/python3 /path/to/your/node1.py
Restart=always
User=pi

[Install]
WantedBy=multi-user.target
  • 同様に、node2.serviceも作成します。
  1. サービスファイルの配置:
    • 作成したサービスファイルを/etc/systemd/system/ディレクトリに配置します。
sudo cp node1.service /etc/systemd/system/
sudo cp node2.service /etc/systemd/system/

4.サービスの有効化と起動:

  • サービスを有効化し、起動します。
sudo systemctl enable node1.service
sudo systemctl enable node2.service
sudo systemctl start node1.service
sudo systemctl start node2.service

●配布

準備中