package com.erp.mq.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.*; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.springframework.beans.factory.annotation.Value; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.atomic.AtomicLong; /** * RocketMQ 消息消费者基类 * 提供通用的消息消费能力 */ @Slf4j public abstract class BaseRocketMQConsumer { @Value("${rocketmq.namesrv-addr}") protected String namesrvAddr; protected DefaultMQPushConsumer consumer; /** * 获取Topic名称 */ protected abstract String getTopic(); /** * 获取消费者组 */ protected abstract String getConsumerGroup(); /** * 获取Tags(*表示所有标签) */ protected abstract String getTags(); /** * 初始化消费者 */ protected void init() throws MQClientException { consumer = new DefaultMQPushConsumer(getConsumerGroup()); consumer.setNamesrvAddr(namesrvAddr); // 设置消费模式:集群消费 consumer.setMessageModel(MessageModel.CLUSTERING); // 设置消费起始点 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 设置并发消费线程数 consumer.setConsumeThreadMin(10); consumer.setConsumeThreadMax(30); // 设置拉取消息间隔 consumer.setPullInterval(0); // 设置拉取消息数量 consumer.setPullBatchSize(32); // 设置最大重试次数 consumer.setMaxReconsumeTimes(16); // 订阅主题 consumer.subscribe(getTopic(), getTags()); // 注册消息监听器 consumer.registerMessageListener(this::consumeMessage); // 启动消费者 consumer.start(); log.info("RocketMQ消费者启动成功 - Topic: {}, Group: {}", getTopic(), getConsumerGroup()); } /** * 消费消息处理 */ protected ConsumeConcurrentlyStatus consumeMessage(List messageExtList, ConsumeConcurrentlyContext context) { AtomicLong successCount = new AtomicLong(0); AtomicLong failCount = new AtomicLong(0); for (MessageExt messageExt : messageExtList) { try { String body = new String(messageExt.getBody(), StandardCharsets.UTF_8); String msgId = messageExt.getMsgId(); String tags = messageExt.getTags(); String keys = messageExt.getKeys(); log.info("收到消息 - Topic: {}, MsgId: {}, Tags: {}, Keys: {}", getTopic(), msgId, tags, keys); // 调用子类处理逻辑 boolean success = handleMessage(body, messageExt); if (success) { successCount.incrementAndGet(); log.debug("消息处理成功 - MsgId: {}", msgId); } else { failCount.incrementAndGet(); log.warn("消息处理返回失败 - MsgId: {}", msgId); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } catch (Exception e) { failCount.incrementAndGet(); log.error("消息处理异常 - MsgId: {}", messageExt.getMsgId(), e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } log.info("批次消息处理完成 - Topic: {}, Success: {}, Failed: {}", getTopic(), successCount.get(), failCount.get()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } /** * 处理消息 - 子类实现具体逻辑 * @param messageBody 消息体 * @param messageExt 消息扩展信息 * @return true表示处理成功,false表示需要重试 */ protected abstract boolean handleMessage(String messageBody, MessageExt messageExt); /** * 关闭消费者 */ public void shutdown() { if (consumer != null) { consumer.shutdown(); log.info("RocketMQ消费者已关闭 - Topic: {}, Group: {}", getTopic(), getConsumerGroup()); } } }