erp-java/rocketmq/consumer/BaseRocketMQConsumer.java

139 lines
4.4 KiB
Java
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package com.erp.mq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Value;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
/**
* RocketMQ 消息消费者基类
* 提供通用的消息消费能力
*/
@Slf4j
public abstract class BaseRocketMQConsumer {
@Value("${rocketmq.namesrv-addr}")
protected String namesrvAddr;
protected DefaultMQPushConsumer consumer;
/**
* 获取Topic名称
*/
protected abstract String getTopic();
/**
* 获取消费者组
*/
protected abstract String getConsumerGroup();
/**
* 获取Tags*表示所有标签)
*/
protected abstract String getTags();
/**
* 初始化消费者
*/
protected void init() throws MQClientException {
consumer = new DefaultMQPushConsumer(getConsumerGroup());
consumer.setNamesrvAddr(namesrvAddr);
// 设置消费模式:集群消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置消费起始点
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 设置并发消费线程数
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(30);
// 设置拉取消息间隔
consumer.setPullInterval(0);
// 设置拉取消息数量
consumer.setPullBatchSize(32);
// 设置最大重试次数
consumer.setMaxReconsumeTimes(16);
// 订阅主题
consumer.subscribe(getTopic(), getTags());
// 注册消息监听器
consumer.registerMessageListener(this::consumeMessage);
// 启动消费者
consumer.start();
log.info("RocketMQ消费者启动成功 - Topic: {}, Group: {}", getTopic(), getConsumerGroup());
}
/**
* 消费消息处理
*/
protected ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
AtomicLong successCount = new AtomicLong(0);
AtomicLong failCount = new AtomicLong(0);
for (MessageExt messageExt : messageExtList) {
try {
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
String msgId = messageExt.getMsgId();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
log.info("收到消息 - Topic: {}, MsgId: {}, Tags: {}, Keys: {}",
getTopic(), msgId, tags, keys);
// 调用子类处理逻辑
boolean success = handleMessage(body, messageExt);
if (success) {
successCount.incrementAndGet();
log.debug("消息处理成功 - MsgId: {}", msgId);
} else {
failCount.incrementAndGet();
log.warn("消息处理返回失败 - MsgId: {}", msgId);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
} catch (Exception e) {
failCount.incrementAndGet();
log.error("消息处理异常 - MsgId: {}", messageExt.getMsgId(), e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
log.info("批次消息处理完成 - Topic: {}, Success: {}, Failed: {}",
getTopic(), successCount.get(), failCount.get());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 处理消息 - 子类实现具体逻辑
* @param messageBody 消息体
* @param messageExt 消息扩展信息
* @return true表示处理成功false表示需要重试
*/
protected abstract boolean handleMessage(String messageBody, MessageExt messageExt);
/**
* 关闭消费者
*/
public void shutdown() {
if (consumer != null) {
consumer.shutdown();
log.info("RocketMQ消费者已关闭 - Topic: {}, Group: {}", getTopic(), getConsumerGroup());
}
}
}