| .. | ||
| config | ||
| consumer | ||
| monitoring | ||
| producer | ||
| spring-cloud-stream | ||
| docker-compose.yml | ||
| pom.xml | ||
| README.md | ||
| start.sh | ||
| status.sh | ||
| stop.sh | ||
RocketMQ 消息队列服务配置
ERP系统的RocketMQ消息队列集群配置,支持高可用、消息持久化和分布式事务。
架构概览
┌─────────────────────────────────────────────────────────────────┐
│ RocketMQ 集群 │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ NameServer │ │ NameServer │ │
│ │ 9876 │◄────────────►│ 9877 │ │
│ └─────────────┘ └─────────────┘ │
│ ▲ │
│ │ │
│ ┌──────┴──────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Master-1 │◄──────►│ Slave-1 │ │ │
│ │ │ 10911 │ 同步 │ 10931 │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Master-2 │◄──────►│ Slave-2 │ │ │
│ │ │ 10921 │ 同步 │ 10941 │ │ │
│ │ └─────────────┘ └─────────────┘ │ │
│ └──────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
目录结构
rocketmq/
├── docker-compose.yml # Docker Compose配置
├── start.sh # 启动脚本
├── stop.sh # 停止脚本
├── status.sh # 状态检查脚本
├── README.md # 本文档
├── config/
│ ├── namesrv-1.conf # NameServer-1配置
│ ├── namesrv-2.conf # NameServer-2配置
│ ├── broker-master-1.conf # Broker Master-1配置
│ ├── broker-master-2.conf # Broker Master-2配置
│ ├── broker-slave-1.conf # Broker Slave-1配置
│ ├── broker-slave-2.conf # Broker Slave-2配置
│ ├── setup-topics.sh # Topic创建脚本
│ ├── rocketmq-base.yml # 基础配置(各服务通用)
│ ├── order-service.yml # 订单服务配置
│ ├── inventory-service.yml # 库存服务配置
│ └── finance-service.yml # 财务服务配置
├── producer/
│ ├── BaseRocketMQProducer.java # 生产者基类
│ ├── OrderMessageProducer.java # 订单消息生产者
│ ├── InventoryMessageProducer.java # 库存消息生产者
│ └── FinanceMessageProducer.java # 财务消息生产者
├── consumer/
│ ├── BaseRocketMQConsumer.java # 消费者基类
│ ├── OrderMessageConsumer.java # 订单消息消费者
│ ├── InventoryMessageConsumer.java # 库存消息消费者
│ └── FinanceMessageConsumer.java # 财务消息消费者
└── monitoring/
├── docker-compose-monitoring.yml # 监控配置
├── prometheus.yml # Prometheus配置
├── rocketmq_rules.yml # 告警规则
└── grafana-dashboard.json # Grafana看板
快速开始
1. 启动集群
cd /root/.openclaw/workspace/erp-java-backend/rocketmq
chmod +x start.sh stop.sh status.sh config/setup-topics.sh
./start.sh
2. 检查状态
./status.sh
3. 访问Dashboard
- RocketMQ Dashboard: http://localhost:8080
- Prometheus: http://localhost:9090
- Grafana: http://localhost:3000 (admin/admin123)
4. 停止集群
./stop.sh
消息主题
| 主题名称 | 用途 | 队列数 |
|---|---|---|
| order-topic | 订单消息 | 8 |
| inventory-topic | 库存消息 | 8 |
| finance-topic | 财务消息 | 8 |
| notification-topic | 通知消息 | 4 |
| payment-topic | 支付消息 | 8 |
| warehouse-topic | 仓库消息 | 4 |
| customer-topic | 客户消息 | 4 |
| report-topic | 报表消息 | 2 |
微服务配置
环境变量
export ROCKETMQ_NAMESRV_ADDR=localhost:9876;localhost:9877
订单服务 (ERP Order Service)
rocketmq:
namesrv-addr: ${ROCKETMQ_NAMESRV_ADDR}
producer:
group: erp-order-producer-group
consumer:
group: erp-order-consumer-group
库存服务 (ERP Inventory Service)
rocketmq:
namesrv-addr: ${ROCKETMQ_NAMESRV_ADDR}
producer:
group: erp-inventory-producer-group
consumer:
group: erp-inventory-consumer-group
财务服务 (ERP Finance Service)
rocketmq:
namesrv-addr: ${ROCKETMQ_NAMESRV_ADDR}
producer:
group: erp-finance-producer-group
consumer:
group: erp-finance-consumer-group
消息格式
订单消息 (OrderMessage)
{
"orderId": "ORD202604040001",
"customerId": "CUST001",
"totalAmount": 999.00,
"status": "PAID",
"paymentMethod": "WECHAT",
"timestamp": 1712208000000
}
库存消息 (InventoryMessage)
{
"skuId": "SKU001",
"productName": "商品名称",
"quantity": 10,
"currentStock": 100,
"threshold": 20,
"warehouseId": "WH001",
"orderId": "ORD202604040001",
"operationType": "DEDUCT",
"timestamp": 1712208000000
}
财务消息 (FinanceMessage)
{
"invoiceId": "INV202604040001",
"orderId": "ORD202604040001",
"amount": 999.00,
"customerId": "CUST001",
"customerName": "客户名称",
"taxNumber": "91310000XXXXXXXX",
"invoiceType": "VAT_SPECIAL",
"status": "CREATED",
"timestamp": 1712208000000
}
高可用配置
主从同步
- Master节点处理读写请求
- Slave节点实时同步Master数据
- 主节点故障时自动切换到从节点
消息持久化
- 消息存储在本地文件系统
- 异步刷盘策略(ASYNC_FLUSH)
- 消息保留时间:72小时
负载均衡
- 多个Consumer组成ConsumerGroup
- 消息在Group内负载均衡
- 支持顺序消息和事务消息
监控告警
监控指标
- TPS(每秒发送/消费消息数)
- 消费堆积量
- 发送失败率
- 内存/磁盘使用率
- NameServer/Broker存活状态
告警规则
| 告警名称 | 条件 | 严重级别 |
|---|---|---|
| NameServerDown | NameServer不可用 | Critical |
| BrokerDown | Broker不可用 | Critical |
| ConsumerLag | 消费堆积 > 1000 | Warning |
| ProducerSendFailed | 发送失败率 > 10/s | Warning |
| MemoryUsageHigh | 内存使用率 > 85% | Warning |
| DiskUsageHigh | 磁盘使用率 > 80% | Warning |
常见问题
1. 启动失败
检查Docker是否运行:
docker info
查看容器日志:
docker-compose logs -f
2. 消息发送失败
检查NameServer地址配置是否正确:
echo $ROCKETMQ_NAMESRV_ADDR
3. 消费堆积
查看消费者状态:
docker exec rocketmq-broker-master-1 sh mqadmin consumerStatus -n namesrv-1:9876 -g erp-order-consumer-group
技术支持
- RocketMQ官方文档: https://rocketmq.apache.org/docs/
- Dashboard使用指南: https://github.com/apache/rocketmq-dashboard