package com.erp.mq.producer; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.*; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * RocketMQ 消息生产者基类 * 提供通用的消息发送能力 */ @Slf4j @Component @RequiredArgsConstructor public abstract class BaseRocketMQProducer { protected final ObjectMapper objectMapper = new ObjectMapper(); @Value("${rocketmq.namesrv-addr}") private String namesrvAddr; @Value("${rocketmq.producer.timeout:3000}") private int timeout; private final ConcurrentHashMap transactionProducers = new ConcurrentHashMap<>(); /** * 获取Topic名称 */ protected abstract String getTopic(); /** * 创建普通消息生产者 */ protected DefaultMQProducer createProducer(String producerGroup) { DefaultMQProducer producer = new DefaultMQProducer(producerGroup); producer.setNamesrvAddr(namesrvAddr); producer.setMaxMessageSize(6291456); producer.setRetryTimesWhenSendFailed(3); producer.setRetryTimesWhenSendAsyncFailed(2); producer.setSendMessageTimeout(timeout); producer.setCompressMsgBodyOverHowmuch(4096); // 开启消息追踪 producer.setEnableMsgTrace(true); producer.setCustomizedTraceTopic("RMQ_SYS_TRACE_TOPIC"); return producer; } /** * 创建事务消息生产者 */ protected TransactionMQProducer createTransactionProducer(String producerGroup) { return transactionProducers.computeIfAbsent(producerGroup, group -> { TransactionMQProducer producer = new TransactionMQProducer(group); producer.setNamesrvAddr(namesrvAddr); producer.setMaxMessageSize(6291456); producer.setSendMessageTimeout(timeout); producer.setExecutorService(java.util.concurrent.Executors.newFixedThreadPool(10)); producer.setTransactionCheckListener(this::checkTransactionState); return producer; }); } /** * 发送同步消息 */ protected SendResult sendMessage(Object message, String tags) { return sendMessage(message, tags, UUID.randomUUID().toString()); } protected SendResult sendMessage(Object message, String tags, String keys) { try { DefaultMQProducer producer = createProducer(getProducerGroup()); producer.start(); String body = objectMapper.writeValueAsString(message); Message msg = new Message( getTopic(), tags, keys, body.getBytes(StandardCharsets.UTF_8) ); SendResult result = producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("消息发送成功 - Topic: {}, Tags: {}, Keys: {}, MsgId: {}", getTopic(), tags, keys, sendResult.getMsgId()); } @Override public void onException(Throwable e) { log.error("消息发送失败 - Topic: {}, Tags: {}, Keys: {}", getTopic(), tags, keys, e); } }); producer.shutdown(); return result; } catch (Exception e) { log.error("发送消息异常", e); throw new RuntimeException("消息发送失败", e); } } /** * 发送异步消息 */ protected void sendMessageAsync(Object message, String tags, AsyncCallback callback) { try { DefaultMQProducer producer = createProducer(getProducerGroup()); producer.start(); String body = objectMapper.writeValueAsString(message); Message msg = new Message(getTopic(), tags, body.getBytes(StandardCharsets.UTF_8)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { callback.onSuccess(sendResult); } @Override public void onException(Throwable e) { callback.onException(e); } }); } catch (Exception e) { log.error("异步发送消息异常", e); throw new RuntimeException("消息发送失败", e); } } /** * 发送事务消息 */ protected TransactionSendResult sendMessageInTransaction(Object message, String tags) { try { TransactionMQProducer producer = createTransactionProducer(getProducerGroup()); producer.start(); String body = objectMapper.writeValueAsString(message); Message msg = new Message(getTopic(), tags, body.getBytes(StandardCharsets.UTF_8)); TransactionSendResult result = producer.sendMessageInTransaction(msg, (msg1, arg) -> { log.info("事务消息本地事务执行成功"); return LocalTransactionState.COMMIT_MESSAGE; }, null); return result; } catch (Exception e) { log.error("发送事务消息异常", e); throw new RuntimeException("事务消息发送失败", e); } } /** * 事务状态检查回调 */ private LocalTransactionState checkTransactionState(String msgId, Object arg) { log.info("检查事务状态 - MsgId: {}", msgId); // 根据本地事务执行结果返回状态 // COMMIT_MESSAGE | ROLLBACK_MESSAGE | UNKNOWN return LocalTransactionState.COMMIT_MESSAGE; } protected abstract String getProducerGroup(); public interface AsyncCallback { void onSuccess(SendResult result); void onException(Throwable e); } }