94 lines
2.7 KiB
Java
94 lines
2.7 KiB
Java
package com.erp.mq.stream;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.rocketmq.spring.support.RocketMQHeader;
|
|
import org.springframework.cloud.stream.function.StreamBridge;
|
|
import org.springframework.context.annotation.Bean;
|
|
import org.springframework.context.annotation.Configuration;
|
|
import org.springframework.messaging.Message;
|
|
|
|
import java.util.function.Consumer;
|
|
|
|
/**
|
|
* Spring Cloud Stream RocketMQ 消费者
|
|
* 使用函数式编程风格定义消费者
|
|
*/
|
|
@Slf4j
|
|
@Configuration
|
|
public class StreamMessageConsumer {
|
|
|
|
/**
|
|
* 订单消息消费者
|
|
*/
|
|
@Bean
|
|
public Consumer<Message<?>> orderConsumer(StreamBridge streamBridge) {
|
|
return message -> {
|
|
RocketMQHeader header = new RocketMQHeader("");
|
|
String tags = header.getTags();
|
|
Object payload = message.getPayload();
|
|
|
|
log.info("收到订单消息 - Tags: {}, Payload: {}", tags, payload);
|
|
|
|
try {
|
|
// TODO: 处理订单消息
|
|
// 根据不同的tags调用不同的处理方法
|
|
processOrderMessage(payload, tags);
|
|
} catch (Exception e) {
|
|
log.error("处理订单消息失败", e);
|
|
throw e;
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* 库存消息消费者
|
|
*/
|
|
@Bean
|
|
public Consumer<Message<?>> inventoryConsumer(StreamBridge streamBridge) {
|
|
return message -> {
|
|
Object payload = message.getPayload();
|
|
log.info("收到库存消息 - Payload: {}", payload);
|
|
|
|
try {
|
|
processInventoryMessage(payload);
|
|
} catch (Exception e) {
|
|
log.error("处理库存消息失败", e);
|
|
throw e;
|
|
}
|
|
};
|
|
}
|
|
|
|
/**
|
|
* 财务消息消费者
|
|
*/
|
|
@Bean
|
|
public Consumer<Message<?>> financeConsumer(StreamBridge streamBridge) {
|
|
return message -> {
|
|
Object payload = message.getPayload();
|
|
log.info("收到财务消息 - Payload: {}", payload);
|
|
|
|
try {
|
|
processFinanceMessage(payload);
|
|
} catch (Exception e) {
|
|
log.error("处理财务消息失败", e);
|
|
throw e;
|
|
}
|
|
};
|
|
}
|
|
|
|
private void processOrderMessage(Object payload, String tags) {
|
|
log.info("处理订单消息 - Tags: {}", tags);
|
|
// TODO: 实现订单处理逻辑
|
|
}
|
|
|
|
private void processInventoryMessage(Object payload) {
|
|
log.info("处理库存消息");
|
|
// TODO: 实现库存处理逻辑
|
|
}
|
|
|
|
private void processFinanceMessage(Object payload) {
|
|
log.info("处理财务消息");
|
|
// TODO: 实现财务处理逻辑
|
|
}
|
|
}
|