package com.erp.mq.stream; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.support.RocketMQHeader; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import java.util.function.Consumer; /** * Spring Cloud Stream RocketMQ 消费者 * 使用函数式编程风格定义消费者 */ @Slf4j @Configuration public class StreamMessageConsumer { /** * 订单消息消费者 */ @Bean public Consumer> orderConsumer(StreamBridge streamBridge) { return message -> { RocketMQHeader header = new RocketMQHeader(""); String tags = header.getTags(); Object payload = message.getPayload(); log.info("收到订单消息 - Tags: {}, Payload: {}", tags, payload); try { // TODO: 处理订单消息 // 根据不同的tags调用不同的处理方法 processOrderMessage(payload, tags); } catch (Exception e) { log.error("处理订单消息失败", e); throw e; } }; } /** * 库存消息消费者 */ @Bean public Consumer> inventoryConsumer(StreamBridge streamBridge) { return message -> { Object payload = message.getPayload(); log.info("收到库存消息 - Payload: {}", payload); try { processInventoryMessage(payload); } catch (Exception e) { log.error("处理库存消息失败", e); throw e; } }; } /** * 财务消息消费者 */ @Bean public Consumer> financeConsumer(StreamBridge streamBridge) { return message -> { Object payload = message.getPayload(); log.info("收到财务消息 - Payload: {}", payload); try { processFinanceMessage(payload); } catch (Exception e) { log.error("处理财务消息失败", e); throw e; } }; } private void processOrderMessage(Object payload, String tags) { log.info("处理订单消息 - Tags: {}", tags); // TODO: 实现订单处理逻辑 } private void processInventoryMessage(Object payload) { log.info("处理库存消息"); // TODO: 实现库存处理逻辑 } private void processFinanceMessage(Object payload) { log.info("处理财务消息"); // TODO: 实现财务处理逻辑 } }