20 KiB
自建物流功能检查与修复报告
检查时间: 2026-04-05 服务模块: logistics-service 关联服务: scheduled-task-service
一、检查结果汇总
| # | 检查项 | 状态 | 说明 |
|---|---|---|---|
| 1 | 快递公司适配器轮询拉取 | ⚠️ 框架就绪,API未对接 | 4个适配器已创建,但queryTraces()均返回空 |
| 2 | 回调接收接口 | ✅ 已实现 | POST /api/logistics/callback/{carrier} |
| 3 | logistics_trace表分区与索引 | ❌ 无分区,有索引 | 表无RANGE LIST分区 |
| 4 | 异常检测定时任务 | ❌ 不存在 | scheduled-task-service中无物流异常检测任务 |
| 5 | 物流时间轴接口 | ⚠️ 接口存在,数据为空 | 依赖轨迹拉取,拉取不到数据则为空 |
二、详细检查结果
2.1 快递公司适配器 — 框架就绪,API待对接
已实现的适配器(4个):
SfCarrierAdapter— 顺丰速运YtoCarrierAdapter— 圆通速递ZtoCarrierAdapter— 中通快递YundaCarrierAdapter— 韵达快递
公共能力(AbstractCarrierAdapter):
verifyMd5Sign()— MD5签名验证parseTime()— 时间解析(支持 yyyy-MM-dd HH:mm:ss 和 yyyy-MM-dd)toJson()— JSON序列化
关键问题:queryTraces() 均未实现,示例:
// SfCarrierAdapter.queryTraces()
@Override
public List<LogisticsTrace> queryTraces(String waybillNo) {
log.info("[顺丰] 查询轨迹, 运单号: {}", waybillNo);
List<LogisticsTrace> traces = new ArrayList<>();
try {
// TODO: 调用顺丰镖局API
log.warn("[顺丰] API对接待实现,当前返回空轨迹");
} catch (Exception e) {
log.error("[顺丰] 查询轨迹异常: {}", waybillNo, e);
}
return traces; // ← 返回空列表
}
轮询拉取调度逻辑(TraceSyncService):
- ✅
@Scheduled(fixedDelayString = "${logistics.sync.interval-minutes:30}000")每30分钟批量同步 - ✅
syncPending()— 查询need_sync=1且sync_status IN (0,3)的运单 - ✅
syncSingle()— 单条同步,支持重试(@Retryable(maxAttempts=3)) - ✅ 去重逻辑:根据
waybill_no + trace_time + location判断是否重复
2.2 回调接收接口 — ✅ 已完整实现
接口路径: POST /api/logistics/callback/{carrier}
@PostMapping("/callback/{carrier}")
public ApiResponse<Void> callback(
@PathVariable String carrier,
@RequestBody LogisticsCallbackRequest request) {
request.setCarrier(carrier);
boolean success = traceSyncService.processCallback(carrier, request.getData());
// ...
}
处理流程:
- 签名验证(
verifySign())— 失败不阻断,仍处理 - 解析回调(
parseCallback())— 各适配器自定义字段映射 - 保存/更新运单状态(
WaybillStatus) - 去重插入轨迹(
LogisticsTrace)
2.3 logistics_trace表 — ❌ 无分区
现有DDL(logistics-service/src/main/resources/db/init.sql):
CREATE TABLE IF NOT EXISTS logistics_trace (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
waybill_no VARCHAR(50) NOT NULL,
carrier VARCHAR(20) NOT NULL,
status VARCHAR(30) DEFAULT '',
status_label VARCHAR(50) DEFAULT '',
location VARCHAR(200) DEFAULT '',
description VARCHAR(500) DEFAULT '',
trace_time DATETIME NOT NULL,
raw_status_code VARCHAR(50) DEFAULT '',
raw_data JSON DEFAULT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
deleted TINYINT(1) DEFAULT 0,
INDEX idx_waybill_no (waybill_no),
INDEX idx_carrier (carrier),
INDEX idx_trace_time (trace_time),
INDEX idx_waybill_trace (waybill_no, trace_time)
);
问题:
- ❌ 无分区字段(建议按
trace_time做RANGE分区或按carrier做LIST分区) - ⚠️
logistics_trace表名与实体类@TableName("logistics_trace")不匹配(实体是logistics_trace,而文档中检查的logistics_traces不存在)
2.4 异常检测定时任务 — ❌ 不存在
检查范围: scheduled-task-service 全部源码
在scheduled_task表中无任何物流异常检测任务记录,也无Java代码实现。
2.5 物流时间轴接口 — ⚠️ 接口存在,数据依赖轨迹拉取
接口路径: GET /api/logistics/trace/{waybillNo}
public TraceResponse getTraces(String waybillNo) {
List<LogisticsTrace> traces = logisticsTraceMapper.selectByWaybillNoOrderByTime(waybillNo);
if (traces.isEmpty()) {
traceSyncService.syncSingle(waybillNo, null, false); // 尝试同步
traces = logisticsTraceMapper.selectByWaybillNoOrderByTime(waybillNo);
}
if (traces.isEmpty()) return null; // ← 无数据时返回null
// 构建TraceResponse...
}
问题: 由于queryTraces()返回空,同步后仍无数据,时间轴为空。
三、修复方案
3.1 补充DDL:分区 + 索引完善
文件: services/logistics-service/src/main/resources/db/migration/V2__add_partition_and_indexes.sql
-- =============================================
-- 物流轨迹表分区与索引补充
-- 适用于 MySQL 8.0+
-- 注意:分区表不支持直接ALTER,需要重建表
-- =============================================
-- Step 1: 创建带分区的影子表
CREATE TABLE IF NOT EXISTS logistics_trace_partitioned (
id BIGINT AUTO_INCREMENT,
waybill_no VARCHAR(50) NOT NULL,
carrier VARCHAR(20) NOT NULL,
status VARCHAR(30) DEFAULT '' COMMENT '轨迹节点状态',
status_label VARCHAR(50) DEFAULT '' COMMENT '状态标签',
location VARCHAR(200) DEFAULT '' COMMENT '轨迹发生地点',
description VARCHAR(500) DEFAULT '' COMMENT '轨迹描述',
trace_time DATETIME NOT NULL COMMENT '轨迹发生时间',
raw_status_code VARCHAR(50) DEFAULT '' COMMENT '物流商原始状态码',
raw_data JSON DEFAULT NULL COMMENT '原始轨迹数据',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
deleted TINYINT(1) DEFAULT 0 COMMENT '逻辑删除',
PRIMARY KEY (id, trace_time), -- 分区字段必须在主键中
INDEX idx_waybill_no (waybill_no),
INDEX idx_carrier (carrier),
INDEX idx_trace_time (trace_time),
INDEX idx_waybill_trace (waybill_no, trace_time),
INDEX idx_waybill_status (waybill_no, status),
INDEX idx_status (status)
)
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
PARTITION BY RANGE (YEAR(trace_time) * 100 + MONTH(trace_time)) (
PARTITION p202601 VALUES LESS THAN (202602),
PARTITION p202602 VALUES LESS THAN (202603),
PARTITION p202603 VALUES LESS THAN (202604),
PARTITION p202604 VALUES LESS THAN (202605),
PARTITION p202605 VALUES LESS THAN (202606),
PARTITION p202606 VALUES LESS THAN (202607),
PARTITION p202607 VALUES LESS THAN (202608),
PARTITION p202608 VALUES LESS THAN (202609),
PARTITION p202609 VALUES LESS THAN (202610),
PARTITION p202610 VALUES LESS THAN (202611),
PARTITION p202611 VALUES LESS THAN (202612),
PARTITION p202612 VALUES LESS THAN (202701),
PARTITION p_future VALUES LESS THAN MAXVALUE
) COMMENT='物流轨迹记录表(分区版)';
-- Step 2: 迁移数据
INSERT INTO logistics_trace_partitioned
SELECT * FROM logistics_trace WHERE deleted = 0;
-- Step 3: 重命名(需要窗口期,建议在低峰期操作)
-- RENAME TABLE logistics_trace TO logistics_trace_old,
-- logistics_trace_partitioned TO logistics_trace;
-- =============================================
-- logistics_waybill_status 索引补充
-- =============================================
CREATE INDEX IF NOT EXISTS idx_waybill_carrier_status
ON logistics_waybill_status (waybill_no, carrier, status);
CREATE INDEX IF NOT EXISTS idx_sync_need
ON logistics_waybill_status (need_sync, sync_status, sync_retry_count);
-- =============================================
-- 异常检测视图(方便异常查询)
-- =============================================
CREATE OR REPLACE VIEW v_logistics_exception AS
SELECT
w.id,
w.waybill_no,
w.carrier,
CASE w.carrier
WHEN 'SF' THEN '顺丰速运'
WHEN 'YTO' THEN '圆通速递'
WHEN 'ZTO' THEN '中通快递'
WHEN 'YUNDA' THEN '韵达快递'
ELSE w.carrier
END AS carrier_name,
w.status,
w.status_label,
w.location,
w.description,
w.last_trace_time,
w.sync_status,
w.sync_fail_reason,
w.order_id,
w.order_no,
w.receiver_name,
w.receiver_phone,
TIMESTAMPDIFF(HOUR, w.last_trace_time, NOW()) AS stale_hours,
CASE
WHEN w.status = 'EXCEPTION' THEN '运单异常'
WHEN w.sync_status = 3 THEN '同步失败'
WHEN w.status NOT IN ('SIGNED','RETURNED') AND TIMESTAMPDIFF(HOUR, w.last_trace_time, NOW()) > 48 THEN '停滞预警'
ELSE '正常'
END AS exception_type
FROM logistics_waybill_status w
WHERE w.deleted = 0;
3.2 创建物流异常检测定时任务
文件: services/logistics-service/src/main/java/com/erp/logistics/job/LogisticsExceptionDetectJob.java
package com.erp.logistics.job;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.erp.logistics.entity.WaybillStatus;
import com.erp.logistics.mapper.WaybillStatusMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.List;
/**
* 物流异常检测定时任务
*
* 检测以下异常:
* 1. 运单状态为 EXCEPTION(物流商报告异常)
* 2. 同步失败次数超过阈值
* 3. 运单停滞超过48小时无新轨迹(未签收且未退回)
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class LogisticsExceptionDetectJob {
private final WaybillStatusMapper waybillStatusMapper;
@Value("${logistics.exception.stale-hours:48}")
private Integer staleHours;
@Value("${logistics.exception.max-retry:5}")
private Integer maxRetry;
/**
* 每小时检测一次
* 触发时间:每小时的第5分钟
*/
@Scheduled(cron = "0 5 * * * ?")
public void detectExceptions() {
log.info("[异常检测] 开始物流异常检测");
long startTime = System.currentTimeMillis();
try {
detectAbnormalStatus(); // 检测异常状态运单
detectSyncFailures(); // 检测同步失败运单
detectStaleWaybills(); // 检测停滞运单
} finally {
log.info("[异常检测] 完成,耗时: {}ms", System.currentTimeMillis() - startTime);
}
}
/**
* 检测异常状态运单(status = EXCEPTION)
*/
private void detectAbnormalStatus() {
LambdaQueryWrapper<WaybillStatus> wrapper = new LambdaQueryWrapper<WaybillStatus>()
.eq(WaybillStatus::getStatus, "EXCEPTION")
.eq(WaybillStatus::getDeleted, 0);
List<WaybillStatus> exceptions = waybillStatusMapper.selectList(wrapper);
for (WaybillStatus waybill : exceptions) {
log.warn("[异常检测] 运单异常: waybillNo={}, carrier={}, status={}, location={}, description={}",
waybill.getWaybillNo(), waybill.getCarrier(),
waybill.getStatusLabel(), waybill.getLocation(),
waybill.getDescription());
// TODO: 触发告警通知(发送邮件/短信/站内消息)
}
if (!exceptions.isEmpty()) {
log.info("[异常检测] 发现 {} 条异常状态运单", exceptions.size());
}
}
/**
* 检测同步失败运单(sync_status = 3 且重试次数超限)
*/
private void detectSyncFailures() {
LambdaQueryWrapper<WaybillStatus> wrapper = new LambdaQueryWrapper<WaybillStatus>()
.eq(WaybillStatus::getSyncStatus, 3)
.ge(WaybillStatus::getSyncRetryCount, maxRetry)
.eq(WaybillStatus::getDeleted, 0);
List<WaybillStatus> failures = waybillStatusMapper.selectList(wrapper);
for (WaybillStatus waybill : failures) {
log.error("[异常检测] 同步失败: waybillNo={}, carrier={}, retryCount={}, reason={}",
waybill.getWaybillNo(), waybill.getCarrier(),
waybill.getSyncRetryCount(), waybill.getSyncFailReason());
// TODO: 触发告警,通知运维或客服
}
if (!failures.isEmpty()) {
log.info("[异常检测] 发现 {} 条同步失败运单", failures.size());
}
}
/**
* 检测停滞运单(48小时无新轨迹且未签收/退回)
*/
private void detectStaleWaybills() {
LocalDateTime threshold = LocalDateTime.now().minusHours(staleHours);
LambdaQueryWrapper<WaybillStatus> wrapper = new LambdaQueryWrapper<WaybillStatus>()
.notIn(WaybillStatus::getStatus, "SIGNED", "RETURNED", "RETURNING")
.eq(WaybillStatus::getDeleted, 0)
.and(w -> w
.isNull(WaybillStatus::getLastTraceTime)
.or()
.lt(WaybillStatus::getLastTraceTime, threshold)
);
List<WaybillStatus> stale = waybillStatusMapper.selectList(wrapper);
for (WaybillStatus waybill : stale) {
long hours = waybill.getLastTraceTime() == null
? -1
: java.time.Duration.between(waybill.getLastTraceTime(), LocalDateTime.now()).toHours();
log.warn("[异常检测] 运单停滞: waybillNo={}, carrier={}, lastTraceTime={}, 停滞={}小时",
waybill.getWaybillNo(), waybill.getCarrier(),
waybill.getLastTraceTime(), hours);
// TODO: 触发告警或自动重新拉取
}
if (!stale.isEmpty()) {
log.info("[异常检测] 发现 {} 条停滞运单(超过{}小时无更新)", stale.size(), staleHours);
}
}
}
配置项(追加到 application.yml):
logistics:
exception:
stale-hours: 48 # 停滞阈值(小时)
max-retry: 5 # 同步失败重试上限
detect-cron: "0 5 * * * ?" # 异常检测Cron表达式(每小时第5分钟)
3.3 在 scheduled-task-service 中注册异常检测任务
文件: services/logistics-service/src/main/resources/db/migration/V3__register_logistics_tasks.sql
-- =============================================
-- 注册物流异常检测定时任务到 scheduled_task 表
-- =============================================
INSERT INTO scheduled_task (
task_name,
description,
task_group,
cron_expression,
task_class,
method_name,
task_params,
status,
concurrent,
sync,
task_type,
max_retries,
retry_interval,
timeout,
alert_enabled,
misfire_policy,
owner,
created_at,
updated_at
) VALUES (
'logistics_exception_detect',
'物流异常检测任务:检测EXCEPTION状态、同步失败、停滞运单,每小时执行一次',
'LOGISTICS',
'0 5 * * * ?',
'com.erp.logistics.job.LogisticsExceptionDetectJob',
'detectExceptions',
'{"staleHours":48,"maxRetry":5}',
'RUNNING',
TRUE,
FALSE,
'BEAN',
3,
60,
300,
TRUE,
'DO_NOTHING',
'system',
NOW(),
NOW()
);
-- 轨迹批量同步任务(已有@Scheduled,这里补充DB记录)
INSERT INTO scheduled_task (
task_name,
description,
task_group,
cron_expression,
task_class,
method_name,
task_params,
status,
concurrent,
sync,
task_type,
max_retries,
retry_interval,
timeout,
alert_enabled,
misfire_policy,
owner,
created_at,
updated_at
) VALUES (
'logistics_trace_sync',
'物流轨迹批量同步:每30分钟同步一次待同步运单轨迹',
'LOGISTICS',
'0 */30 * * * ?',
'com.erp.logistics.service.TraceSyncService',
'syncPending',
'{"batchSize":100,"maxRetry":3}',
'RUNNING',
TRUE,
FALSE,
'BEAN',
3,
60,
600,
TRUE,
'DO_NOTHING',
'system',
NOW(),
NOW()
) ON DUPLICATE KEY UPDATE
description = VALUES(description),
cron_expression = VALUES(cron_expression),
task_params = VALUES(task_params),
status = 'RUNNING';
3.4 补充物流时间轴增强接口
文件: services/logistics-service/src/main/java/com/erp/logistics/dto/response/TraceResponse.java(需补充字段)
// 追加以下字段到 TraceResponse
private Integer totalTraces; // 轨迹总节点数
private String estimatedDelivery; // 预计送达时间
private Boolean isOnTime; // 是否准时
private Integer transitDays; // 在途天数
private List<String> transitCities; // 途经城市列表
增强接口:LogisticsController 追加
@GetMapping("/trace/timeline/{waybillNo}")
@Operation(summary = "物流时间轴(增强版)", description = "返回物流轨迹时间轴,包含预计送达、分段统计等")
public ApiResponse<TraceResponse> getTraceTimeline(
@Parameter(description = "运单号") @PathVariable String waybillNo) {
TraceResponse trace = logisticsService.getTracesEnhanced(waybillNo);
if (trace == null) {
return ApiResponse.notFound("未找到物流轨迹");
}
return ApiResponse.success(trace);
}
四、修复优先级
| 优先级 | 修复项 | 影响 | 工作量 |
|---|---|---|---|
| P0 | 3.1 DDL分区补充 | 数据库性能、长期运维 | 中 |
| P0 | 3.2 异常检测Job | 运单异常无法发现 | 低 |
| P1 | 3.3 定时任务注册 | 异常检测Job无法自动触发 | 低 |
| P2 | 3.4 时间轴增强 | 接口体验 | 低 |
| P2 | 快递公司API对接 | 轨迹数据拉取(需第三方API密钥) | 高(需商务对接) |
| P0 | 缺失依赖修复(hutool + spring-retry) | pom.xml缺少依赖导致编译失败 | 低 |
五、风险说明
- 分区表重建风险:MySQL分区表不支持在线变更主键和分区字段,需要在低峰期用
RENAME TABLE切换。建议提前在测试环境验证。 - API对接依赖:当前4个快递公司适配器的
queryTraces()均未对接真实API,轨迹主动拉取功能暂时无效,只能依赖回调推送。建议尽快与顺丰镖局/圆通开放平台完成商务对接。 - 告警渠道未配置:
LogisticsExceptionDetectJob发现异常后,// TODO: 触发告警部分需要接入邮件/钉钉/飞书等通知渠道。
六、文件清单
| 操作 | 文件路径 |
|---|---|
| 新增 | services/logistics-service/src/main/resources/db/migration/V2__add_partition_and_indexes.sql |
| 新增 | services/logistics-service/src/main/java/com/erp/logistics/job/LogisticsExceptionDetectJob.java |
| 新增 | services/logistics-service/src/main/resources/db/migration/V3__register_logistics_tasks.sql |
| 修改 | services/logistics-service/src/main/resources/application.yml(追加exception配置节) |
| 修改 | services/logistics-service/src/main/java/com/erp/logistics/dto/response/TraceResponse.java(追加增强字段) |
| 修改 | services/logistics-service/src/main/java/com/erp/logistics/controller/LogisticsController.java(追加timeline接口) |
| 修改 | services/logistics-service/src/main/java/com/erp/logistics/service/LogisticsService.java(新增getTracesEnhanced方法) |
| 修改 | services/logistics-service/pom.xml(追加hutool、spring-retry依赖) |
七、验证结果
$ mvn clean compile -pl services/logistics-service -am -DskipTests
...
[INFO] Compiling 27 source files with javac [debug target 17] to target/classes
[INFO] BUILD SUCCESS
编译通过 — 27个源文件全部编译成功(含新增的LogisticsExceptionDetectJob)。