- 1. 概要
- 2. ソース作成
- 3. 参考サイト
1. 概要
「Redis」からの「Stream」にキューをいれて「Python」で受け取って、「MQTT」へ「Publish」するプログラムを書きます。
2. ソース作成
下記のソースを作成します。
import redis
import json
import base64
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
STREAM = "mqtt_pub"
GROUP = "mqtt_workers"
CONSUMER = "worker-1"
while True:
resp = r.xreadgroup(
groupname=GROUP,
consumername=CONSUMER,
streams={STREAM: ">"},
count=1,
block=5000
)
if not resp:
continue
for stream, messages in resp:
for msg_id, fields in messages:
try:
payload = json.loads(fields['payload'])
topic = payload['topic']
data = base64.b64decode(payload['data'])
print("topic:", topic)
print("data:", data)
# MQTT Publish 処理をここに書く
# 処理成功 → ACK
r.xack(STREAM, GROUP, msg_id)
except Exception as e:
print("error:", e)
# ACKしない → 再処理可能
ソース名を「sample.py」として、下記で起動します。
python sample.py
3. 参考サイト
本ページは、「ChatGPT」軍曹を参考にさせていただきました。
|
|