132 lines
4.5 KiB
Java
132 lines
4.5 KiB
Java
package com.erp.mq.consumer;
|
|
|
|
import com.erp.mq.producer.InventoryMessageProducer.InventoryMessage;
|
|
import com.erp.mq.producer.InventoryMessageProducer;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
import jakarta.annotation.PostConstruct;
|
|
import jakarta.annotation.PreDestroy;
|
|
|
|
/**
|
|
* 库存消息消费者
|
|
* 负责处理库存相关的消息
|
|
*/
|
|
@Slf4j
|
|
@Component
|
|
public class InventoryMessageConsumer extends BaseRocketMQConsumer {
|
|
|
|
private final ObjectMapper objectMapper = new ObjectMapper();
|
|
|
|
@Value("${erp.rocketmq.topics.inventory}")
|
|
private String inventoryTopic;
|
|
|
|
@Value("${erp.rocketmq.consumer.groups.inventory-group}")
|
|
private String inventoryConsumerGroup;
|
|
|
|
@PostConstruct
|
|
public void init() {
|
|
try {
|
|
super.init();
|
|
log.info("库存消息消费者初始化成功");
|
|
} catch (Exception e) {
|
|
log.error("库存消息消费者初始化失败", e);
|
|
}
|
|
}
|
|
|
|
@Override
|
|
protected String getTopic() {
|
|
return inventoryTopic;
|
|
}
|
|
|
|
@Override
|
|
protected String getConsumerGroup() {
|
|
return inventoryConsumerGroup;
|
|
}
|
|
|
|
@Override
|
|
protected String getTags() {
|
|
return "*";
|
|
}
|
|
|
|
@Override
|
|
protected boolean handleMessage(String messageBody, MessageExt messageExt) {
|
|
try {
|
|
InventoryMessage inventoryMessage = objectMapper.readValue(messageBody, InventoryMessage.class);
|
|
String tags = messageExt.getTags();
|
|
|
|
log.info("处理库存消息 - Tags: {}, SkuId: {}, Quantity: {}",
|
|
tags, inventoryMessage.getSkuId(), inventoryMessage.getQuantity());
|
|
|
|
switch (tags) {
|
|
case InventoryMessageProducer.TAGS_DEDUCT:
|
|
return handleInventoryDeduct(inventoryMessage);
|
|
case InventoryMessageProducer.TAGS_RESTORE:
|
|
return handleInventoryRestore(inventoryMessage);
|
|
case InventoryMessageProducer.TAGS_LOCK:
|
|
return handleInventoryLock(inventoryMessage);
|
|
case InventoryMessageProducer.TAGS_UNLOCK:
|
|
return handleInventoryUnlock(inventoryMessage);
|
|
case InventoryMessageProducer.TAGS_ALERT:
|
|
return handleInventoryAlert(inventoryMessage);
|
|
default:
|
|
log.warn("未知的库存消息标签: {}", tags);
|
|
return true;
|
|
}
|
|
} catch (Exception e) {
|
|
log.error("库存消息处理异常", e);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
private boolean handleInventoryDeduct(InventoryMessage message) {
|
|
log.info("处理库存扣减 - SkuId: {}, Quantity: {}, OrderId: {}",
|
|
message.getSkuId(), message.getQuantity(), message.getOrderId());
|
|
// TODO: 调用库存服务扣减库存
|
|
// 1. 检查库存是否充足
|
|
// 2. 执行库存扣减
|
|
// 3. 记录库存变动日志
|
|
return true;
|
|
}
|
|
|
|
private boolean handleInventoryRestore(InventoryMessage message) {
|
|
log.info("处理库存恢复 - SkuId: {}, Quantity: {}, OrderId: {}",
|
|
message.getSkuId(), message.getQuantity(), message.getOrderId());
|
|
// TODO: 调用库存服务恢复库存
|
|
// 1. 执行库存增加
|
|
// 2. 记录库存变动日志
|
|
return true;
|
|
}
|
|
|
|
private boolean handleInventoryLock(InventoryMessage message) {
|
|
log.info("处理库存锁定 - SkuId: {}, Quantity: {}",
|
|
message.getSkuId(), message.getQuantity());
|
|
// TODO: 调用库存服务锁定库存
|
|
return true;
|
|
}
|
|
|
|
private boolean handleInventoryUnlock(InventoryMessage message) {
|
|
log.info("处理库存解锁 - SkuId: {}, Quantity: {}",
|
|
message.getSkuId(), message.getQuantity());
|
|
// TODO: 调用库存服务解锁库存
|
|
return true;
|
|
}
|
|
|
|
private boolean handleInventoryAlert(InventoryMessage message) {
|
|
log.info("处理库存预警 - SkuId: {}, CurrentStock: {}, Threshold: {}",
|
|
message.getSkuId(), message.getCurrentStock(), message.getThreshold());
|
|
// TODO: 发送库存预警通知
|
|
// 1. 发送邮件/短信通知
|
|
// 2. 生成采购建议
|
|
return true;
|
|
}
|
|
|
|
@PreDestroy
|
|
public void destroy() {
|
|
shutdown();
|
|
}
|
|
}
|