Python - mqtt・redis - Publish


クラウディア 


1. 概要
2. ソース作成
3. 参考サイト

1. 概要

 「Redis」からの「Stream」にキューをいれて「Python」で受け取って、「MQTT」へ「Publish」するプログラムを書きます。

2. ソース作成

 下記のソースを作成します。

import redis
import json
import base64
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

STREAM = "mqtt_pub"
GROUP  = "mqtt_workers"
CONSUMER = "worker-1"

while True:
    resp = r.xreadgroup(
        groupname=GROUP,
        consumername=CONSUMER,
        streams={STREAM: ">"},
        count=1,
        block=5000
    )

    if not resp:
        continue

    for stream, messages in resp:
        for msg_id, fields in messages:
            try:
                payload = json.loads(fields['payload'])

                topic   = payload['topic']
                data    = base64.b64decode(payload['data'])

                print("topic:", topic)
                print("data:", data)

                # MQTT Publish 処理をここに書く

                # 処理成功 → ACK
                r.xack(STREAM, GROUP, msg_id)

            except Exception as e:
                print("error:", e)
                # ACKしない → 再処理可能
 ソース名を「sample.py」として、下記で起動します。

python sample.py

3. 参考サイト

 本ページは、「ChatGPT」軍曹を参考にさせていただきました。

EaseUS