mq消息传送
开启消息发布确认模式
def publish(self, message):
"""发布消息(自动重连)"""
for i in range(3):
try:
message_ = json.dumps(message, ensure_ascii=False)
self.ensure_connection()
# 开启 confirm 模式(Publisher Confirms)
self.channel.confirm_delivery()
self.channel.basic_publish(
exchange=self.queue_exchange,
routing_key=self.queue_routing_key,
body=message_,
properties=pika.BasicProperties(delivery_mode=2),
mandatory=True
) # 持久化消息
logger.info(resp_dic["infoId"] + ":" + str(message))
# print("confirm---",str(confirm))
return
except Exception as e:
logger.error(f"Publish failed: {e}. Reconnecting...{message}")
self.is_connected = False
else:
logger.error("Failed to publish message after 3 attempts.")
删掉消息队列或者对应路由之后,报出输出如下
1 unroutable message(s) returned. Reconnecting...
除了增加confirm_delivery消息确保到路由之外,还应该每次发送消息前检测链接是否正常,增加mq断联重试机制
评论区