微服务拆分之后,一个原本在单体里用本地事务搞定的操作,被拆成了两个服务各管一段。调用方式从本地调用换成了 MQ,问题也跟着来了——本地事务和消息发送,怎么保证一致性?

这篇文章就把这个问题从头到尾捋一遍。


问题是怎么来的

先说一个典型场景。用户下单之后,订单服务需要做两件事:在本地创建订单记录,同时通知库存服务扣减库存。

单体时代,两张表在同一个数据库,一个本地事务搞定,要么一起成功,要么一起回滚。

拆成微服务之后,库存服务有了自己独立的数据库。订单服务改成发 MQ 消息,库存服务监听消息然后扣减自己的库存表。这一改,原来的强一致就没了。

整个链路变成了本地事务 + MQ 异步调用,一致性问题随之出现。大体上有三个层面:本地事务和消息发送的原子性、消息到达 Broker 后消费者能不能可靠收到、消费者重复消费怎么处理。第二、三点算是 MQ 需要做到的事情,前者保证消息不丢,后者靠消费者自己做幂等。

真正麻烦的是第一点:本地事务提交了,消息却没发出去,数据就不一致了。 这才是今天要聊的核心。


代码层面能做到什么程度

先说说不引入额外组件的情况下,代码本身能做什么。

思路不复杂:**本地事务尽量小,别在事务里做 I/O,消息发送放在事务提交之后。**顺序一定不能反。如果先发消息再执行事务,万一事务回滚,消息已经出去了,这个是没法撤回的。

如果用的是声明式事务,Spring 提供了 TransactionSynchronization 接口,可以注册一个事务后置处理器。我们封装了一个工具类:

public class TransactionUtil {

    public static void afterTransactionComplete(PostProcessor postProcessor) {
        if (TransactionSynchronizationManager.isActualTransactionActive()) {
            TransactionSynchronizationManager.registerSynchronization(new TransactionComplete(postProcessor));
        }
    }

    static class TransactionComplete implements TransactionSynchronization {

        private final PostProcessor postProcessor;

        TransactionComplete(PostProcessor postProcessor) {
            this.postProcessor = postProcessor;
        }

        @Override
        public void afterCompletion(int status) {
            if (status == TransactionSynchronization.STATUS_COMMITTED) {
                postProcessor.postProcess();
            }
        }
    }

    @FunctionalInterface
    interface PostProcessor {
        void postProcess();
    }
  
}

用起来这样写:

@Transactional
public void createOrder() {
  
    // 本地事务逻辑:创建订单记录

    TransactionUtil.afterTransactionComplete(() -> {
        // 事务提交后发送 MQ,通知库存服务扣减库存
    });
  
}

afterCompletion 只在 STATUS_COMMITTED 时才触发后置逻辑,确保消息在事务成功提交之后才发。

如果使用的是编程式事务就更直接了:

public void createOrder() {
  
    Boolean success = txTemplate.execute(status -> {
        // 本地事务逻辑:创建订单记录
        return true;
    });
    if (Boolean.TRUE.equals(success)) {
        // 发送 MQ,通知库存服务扣减库存
    }
  
}

但这样就万无一失了吗?并没有。

这些只是在代码结构上做了最优的安排,还有几个绕不开的情况:事务提交成功但发消息之前服务停机了、发消息超时了、MQ Broker 因为 OS PageCache 繁忙返回 system busy/broker busy 消息丢了。这些情况都会导致本地事务提交了、消息却没发出去,回不去了,这样数据就不一致了。

代码层面的优化只能降低概率,解决不了根本问题。


本地消息表:把消息持久化到本地

本地消息表是解决这个问题比较经典的方案,核心思路是:把消息的发送记录和业务数据放在同一个本地事务里,靠定时任务来兜底重试。

流程大致是这样:执行本地业务逻辑的同时,往 biz_msg_delivery 表写一条投递记录,这两步必须在同一个事务里,任何一步失败都回滚。事务提交成功后,立即尝试发送消息,成功了就把投递记录标记为「已完成」。如果这里发送失败了也没关系,系统有定时任务持续扫描状态为「未完成」的投递记录,自动重试。

这里有个细节值得说一下:「标记为已完成」这个操作本身也可能失败。失败了,定时任务下次扫到这条记录还是会再发一次,消息就重复了。所以消费者端的幂等是必须做的,不是可选项。

如果想减轻消费者的幂等压力,可以在定时任务重试前先查询下游业务的状态,确认已经成功处理了,就直接把投递记录推进为已完成,跳过消息发送。另外,重试次数超过阈值就告警转人工,别让定时任务无限循环。

消息投递记录表长这样:

CREATE TABLE `biz_msg_delivery` (
  `id`                    bigint        NOT NULL COMMENT '消息投递表主键',
  `msg_topic`             varchar(255)           COMMENT '消息 topic',
  `msg_tag`               varchar(255)           COMMENT '消息 tag',
  `msg_content`           text                   COMMENT '消息内容',
  `delivery_status`       tinyint                COMMENT '投递状态:1-未投递 2-已完成',
  `max_delivery_retry_time` int                  COMMENT '最大重试次数,超过则人工介入',
  `delivery_retry_time`   int                    COMMENT '当前重试次数',
  `delivery_bean`         varchar(255)           COMMENT '负责投递该消息的 Bean 名称',
  `create_time`           datetime               COMMENT '创建时间',
  `update_time`           datetime               COMMENT '更新时间',
  `is_deleted`            tinyint                COMMENT '逻辑删除',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

delivery_bean 这个字段设计比较实用——定时任务拿到投递记录后,通过这个字段找到对应的 Spring Bean,调用其发送逻辑。不同业务的消息可以共用同一张表和同一个定时任务,扩展起来方便。


RocketMQ 事务消息:另一种思路

除了本地消息表,RocketMQ 原生也支持事务消息,它的核心本质上就是两阶段提交

生产者先发一条 Half Message(半事务消息)到 RocketMQ,这条消息消费者暂时是不可见的。RocketMQ 持久化成功之后返回 Ack,生产者开始执行本地事务。

本地事务执行完毕,根据结果发二次确认:Commit 就把消息投递给消费者,Rollback 就丢掉。如果网络异常或生产者宕机,RocketMQ 超时后会主动向生产者发起消息回查,生产者需要检查本地事务的最终状态再次确认。

说白了就是:用 MQ 来协调本地事务和消息发送之间的顺序与状态,把两者的最终结果绑在一起。


为什么最终选了本地消息表

两个方案都能解决问题,但我个人认为本地消息表方案可能更优一点,原因说起来也直白。

RocketMQ 的事务消息回查机制听起来合理,但实际写起来挺头疼。RocketMQ 在超时后会来问你:这条消息对应的本地事务到底成没成?你得专门实现一个反查接口来回答它。如果业务方法里涉及多张表的写入,回查时就得把这几张表都查一遍,判断整体状态是否成功,逻辑根本不比主流程简单,而且还得单独维护,改了业务逻辑还得跟着改,侵入性很强。

消息监控也是个问题。事务消息在 Commit 之前,Topic 会被替换成内部的 RMQ_SYS_TRANS_HALF_TOPIC,不是你业务定义的那个 Topic。消息真正投递出去之前,控制台上是看不到这条消息流转到业务 Topic 的,出了问题排查链路会断,有监控盲区。

最后一点是技术栈绑定。事务消息是 RocketMQ 特有的功能,本地消息表理论上支持任何 MQ,甚至改成 HTTP 回调也能用,后期迁移成本更低。

相比之下,本地消息表要轻得多:多一张表、一个定时任务,不用改 MQ 的接入方式,不用写反查接口,任何人看了都能理解,出了问题也好排查。


本地消息表也不是银弹

本地消息表也不是万能的,有几个前提需要先想清楚。

消息表和业务表必须在同一个数据库里。整个方案能成立,根本原因是借助本地事务的 ACID 保证两者同生共死,跨库就失去了这个前提,方案直接失效。

消息记录是持久化存储的,数据量会随时间增长,要提前规划好归档策略,否则这张表迟早变成性能瓶颈。定时任务轮询本身也有扫表压力,任务频率、批量大小、索引设计都得认真对待。

最后,由于定时任务重试的存在,消息重复发送不可避免,消费方必须做好幂等,这不是建议,是硬性要求。

这两种方案本质上都不保证强一致,保证的是最终一致性。如果业务是允许短暂不一致、但必须最终达到一致的这类场景,这类方案就够用了。如果 MQ 支持事务消息,可以考虑事务消息;如果不支持,或者有消息持久化、对账需求,本地消息表是更稳的选择。