139 lines
4.4 KiB
Java
139 lines
4.4 KiB
Java
package com.erp.mq.consumer;
|
||
|
||
import lombok.extern.slf4j.Slf4j;
|
||
import org.apache.rocketmq.client.consumer.*;
|
||
import org.apache.rocketmq.client.exception.MQClientException;
|
||
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
||
import org.apache.rocketmq.common.message.MessageExt;
|
||
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
|
||
import org.springframework.beans.factory.annotation.Value;
|
||
|
||
import java.nio.charset.StandardCharsets;
|
||
import java.util.List;
|
||
import java.util.concurrent.atomic.AtomicLong;
|
||
|
||
/**
|
||
* RocketMQ 消息消费者基类
|
||
* 提供通用的消息消费能力
|
||
*/
|
||
@Slf4j
|
||
public abstract class BaseRocketMQConsumer {
|
||
|
||
@Value("${rocketmq.namesrv-addr}")
|
||
protected String namesrvAddr;
|
||
|
||
protected DefaultMQPushConsumer consumer;
|
||
|
||
/**
|
||
* 获取Topic名称
|
||
*/
|
||
protected abstract String getTopic();
|
||
|
||
/**
|
||
* 获取消费者组
|
||
*/
|
||
protected abstract String getConsumerGroup();
|
||
|
||
/**
|
||
* 获取Tags(*表示所有标签)
|
||
*/
|
||
protected abstract String getTags();
|
||
|
||
/**
|
||
* 初始化消费者
|
||
*/
|
||
protected void init() throws MQClientException {
|
||
consumer = new DefaultMQPushConsumer(getConsumerGroup());
|
||
consumer.setNamesrvAddr(namesrvAddr);
|
||
|
||
// 设置消费模式:集群消费
|
||
consumer.setMessageModel(MessageModel.CLUSTERING);
|
||
|
||
// 设置消费起始点
|
||
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
|
||
|
||
// 设置并发消费线程数
|
||
consumer.setConsumeThreadMin(10);
|
||
consumer.setConsumeThreadMax(30);
|
||
|
||
// 设置拉取消息间隔
|
||
consumer.setPullInterval(0);
|
||
|
||
// 设置拉取消息数量
|
||
consumer.setPullBatchSize(32);
|
||
|
||
// 设置最大重试次数
|
||
consumer.setMaxReconsumeTimes(16);
|
||
|
||
// 订阅主题
|
||
consumer.subscribe(getTopic(), getTags());
|
||
|
||
// 注册消息监听器
|
||
consumer.registerMessageListener(this::consumeMessage);
|
||
|
||
// 启动消费者
|
||
consumer.start();
|
||
|
||
log.info("RocketMQ消费者启动成功 - Topic: {}, Group: {}", getTopic(), getConsumerGroup());
|
||
}
|
||
|
||
/**
|
||
* 消费消息处理
|
||
*/
|
||
protected ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExtList, ConsumeConcurrentlyContext context) {
|
||
AtomicLong successCount = new AtomicLong(0);
|
||
AtomicLong failCount = new AtomicLong(0);
|
||
|
||
for (MessageExt messageExt : messageExtList) {
|
||
try {
|
||
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
|
||
String msgId = messageExt.getMsgId();
|
||
String tags = messageExt.getTags();
|
||
String keys = messageExt.getKeys();
|
||
|
||
log.info("收到消息 - Topic: {}, MsgId: {}, Tags: {}, Keys: {}",
|
||
getTopic(), msgId, tags, keys);
|
||
|
||
// 调用子类处理逻辑
|
||
boolean success = handleMessage(body, messageExt);
|
||
|
||
if (success) {
|
||
successCount.incrementAndGet();
|
||
log.debug("消息处理成功 - MsgId: {}", msgId);
|
||
} else {
|
||
failCount.incrementAndGet();
|
||
log.warn("消息处理返回失败 - MsgId: {}", msgId);
|
||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||
}
|
||
} catch (Exception e) {
|
||
failCount.incrementAndGet();
|
||
log.error("消息处理异常 - MsgId: {}", messageExt.getMsgId(), e);
|
||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||
}
|
||
}
|
||
|
||
log.info("批次消息处理完成 - Topic: {}, Success: {}, Failed: {}",
|
||
getTopic(), successCount.get(), failCount.get());
|
||
|
||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||
}
|
||
|
||
/**
|
||
* 处理消息 - 子类实现具体逻辑
|
||
* @param messageBody 消息体
|
||
* @param messageExt 消息扩展信息
|
||
* @return true表示处理成功,false表示需要重试
|
||
*/
|
||
protected abstract boolean handleMessage(String messageBody, MessageExt messageExt);
|
||
|
||
/**
|
||
* 关闭消费者
|
||
*/
|
||
public void shutdown() {
|
||
if (consumer != null) {
|
||
consumer.shutdown();
|
||
log.info("RocketMQ消费者已关闭 - Topic: {}, Group: {}", getTopic(), getConsumerGroup());
|
||
}
|
||
}
|
||
}
|