181 lines
6.1 KiB
Java
181 lines
6.1 KiB
Java
package com.erp.mq.producer;
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.rocketmq.client.producer.*;
|
|
import org.apache.rocketmq.common.message.Message;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.util.UUID;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
/**
|
|
* RocketMQ 消息生产者基类
|
|
* 提供通用的消息发送能力
|
|
*/
|
|
@Slf4j
|
|
@Component
|
|
@RequiredArgsConstructor
|
|
public abstract class BaseRocketMQProducer {
|
|
|
|
protected final ObjectMapper objectMapper = new ObjectMapper();
|
|
|
|
@Value("${rocketmq.namesrv-addr}")
|
|
private String namesrvAddr;
|
|
|
|
@Value("${rocketmq.producer.timeout:3000}")
|
|
private int timeout;
|
|
|
|
private final ConcurrentHashMap<String, TransactionMQProducer> transactionProducers = new ConcurrentHashMap<>();
|
|
|
|
/**
|
|
* 获取Topic名称
|
|
*/
|
|
protected abstract String getTopic();
|
|
|
|
/**
|
|
* 创建普通消息生产者
|
|
*/
|
|
protected DefaultMQProducer createProducer(String producerGroup) {
|
|
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
|
|
producer.setNamesrvAddr(namesrvAddr);
|
|
producer.setMaxMessageSize(6291456);
|
|
producer.setRetryTimesWhenSendFailed(3);
|
|
producer.setRetryTimesWhenSendAsyncFailed(2);
|
|
producer.setSendMessageTimeout(timeout);
|
|
producer.setCompressMsgBodyOverHowmuch(4096);
|
|
// 开启消息追踪
|
|
producer.setEnableMsgTrace(true);
|
|
producer.setCustomizedTraceTopic("RMQ_SYS_TRACE_TOPIC");
|
|
return producer;
|
|
}
|
|
|
|
/**
|
|
* 创建事务消息生产者
|
|
*/
|
|
protected TransactionMQProducer createTransactionProducer(String producerGroup) {
|
|
return transactionProducers.computeIfAbsent(producerGroup, group -> {
|
|
TransactionMQProducer producer = new TransactionMQProducer(group);
|
|
producer.setNamesrvAddr(namesrvAddr);
|
|
producer.setMaxMessageSize(6291456);
|
|
producer.setSendMessageTimeout(timeout);
|
|
producer.setExecutorService(java.util.concurrent.Executors.newFixedThreadPool(10));
|
|
producer.setTransactionCheckListener(this::checkTransactionState);
|
|
return producer;
|
|
});
|
|
}
|
|
|
|
/**
|
|
* 发送同步消息
|
|
*/
|
|
protected SendResult sendMessage(Object message, String tags) {
|
|
return sendMessage(message, tags, UUID.randomUUID().toString());
|
|
}
|
|
|
|
protected SendResult sendMessage(Object message, String tags, String keys) {
|
|
try {
|
|
DefaultMQProducer producer = createProducer(getProducerGroup());
|
|
producer.start();
|
|
|
|
String body = objectMapper.writeValueAsString(message);
|
|
Message msg = new Message(
|
|
getTopic(),
|
|
tags,
|
|
keys,
|
|
body.getBytes(StandardCharsets.UTF_8)
|
|
);
|
|
|
|
SendResult result = producer.send(msg, new SendCallback() {
|
|
@Override
|
|
public void onSuccess(SendResult sendResult) {
|
|
log.info("消息发送成功 - Topic: {}, Tags: {}, Keys: {}, MsgId: {}",
|
|
getTopic(), tags, keys, sendResult.getMsgId());
|
|
}
|
|
|
|
@Override
|
|
public void onException(Throwable e) {
|
|
log.error("消息发送失败 - Topic: {}, Tags: {}, Keys: {}",
|
|
getTopic(), tags, keys, e);
|
|
}
|
|
});
|
|
|
|
producer.shutdown();
|
|
return result;
|
|
} catch (Exception e) {
|
|
log.error("发送消息异常", e);
|
|
throw new RuntimeException("消息发送失败", e);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 发送异步消息
|
|
*/
|
|
protected void sendMessageAsync(Object message, String tags, AsyncCallback callback) {
|
|
try {
|
|
DefaultMQProducer producer = createProducer(getProducerGroup());
|
|
producer.start();
|
|
|
|
String body = objectMapper.writeValueAsString(message);
|
|
Message msg = new Message(getTopic(), tags, body.getBytes(StandardCharsets.UTF_8));
|
|
|
|
producer.send(msg, new SendCallback() {
|
|
@Override
|
|
public void onSuccess(SendResult sendResult) {
|
|
callback.onSuccess(sendResult);
|
|
}
|
|
|
|
@Override
|
|
public void onException(Throwable e) {
|
|
callback.onException(e);
|
|
}
|
|
});
|
|
} catch (Exception e) {
|
|
log.error("异步发送消息异常", e);
|
|
throw new RuntimeException("消息发送失败", e);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 发送事务消息
|
|
*/
|
|
protected TransactionSendResult sendMessageInTransaction(Object message, String tags) {
|
|
try {
|
|
TransactionMQProducer producer = createTransactionProducer(getProducerGroup());
|
|
producer.start();
|
|
|
|
String body = objectMapper.writeValueAsString(message);
|
|
Message msg = new Message(getTopic(), tags, body.getBytes(StandardCharsets.UTF_8));
|
|
|
|
TransactionSendResult result = producer.sendMessageInTransaction(msg, (msg1, arg) -> {
|
|
log.info("事务消息本地事务执行成功");
|
|
return LocalTransactionState.COMMIT_MESSAGE;
|
|
}, null);
|
|
|
|
return result;
|
|
} catch (Exception e) {
|
|
log.error("发送事务消息异常", e);
|
|
throw new RuntimeException("事务消息发送失败", e);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 事务状态检查回调
|
|
*/
|
|
private LocalTransactionState checkTransactionState(String msgId, Object arg) {
|
|
log.info("检查事务状态 - MsgId: {}", msgId);
|
|
// 根据本地事务执行结果返回状态
|
|
// COMMIT_MESSAGE | ROLLBACK_MESSAGE | UNKNOWN
|
|
return LocalTransactionState.COMMIT_MESSAGE;
|
|
}
|
|
|
|
protected abstract String getProducerGroup();
|
|
|
|
public interface AsyncCallback {
|
|
void onSuccess(SendResult result);
|
|
void onException(Throwable e);
|
|
}
|
|
}
|