RocketMQ事务消息分析:TCC、客户端流程、服务端流程、万字长文细细聊 …

导语

事务消息是RocketMQ提供的高级特性之一,它为分布式系统事务最终一致性提供了一种解决方案。

相较于传统的XA分布式事务解决方案,事务消息性能更高,使用更简洁。

内容范围

本文基于RocketMQ 5,对事务消息进行分析,4.*版本相关逻辑,在源码分析环节补充。

在RocketMQ中使用事务消息,对于消费者而言,处理事务消息与普通消息没有区别。

对于生产者而言,需要做如下特殊配置:

  1. 消息Topic类型设置为 TRANSACTION
  2. 实现TransactionChecker接口的check函数,检查本地事务状态,返回COMMIT,ROLLBACK,UNKNOWN
  3. 开启和提交事务

    Use Case

内容提要

编程范式

分布式事务

什么是事务?

事务通常指的是一系列的操作或事件,它们作为一个整体被视为单个单元。在数据库管理中,事务是指数据库操作的一个逻辑单元,包含了一个或多个数据库操作,例如插入、更新、删除等。事务具有以下四个特性:

  1. 原子性(Atomicity):事务被视为不可分割的单位,要么全部执行,要么全部不执行。
  2. 一致性(Consistency):事务的执行使数据库从一个一致性状态转变为另一个一致性状态,数据库在事务开始和结束时保持一致性。
  3. 隔离性(Isolation):事务的执行在逻辑上是独立的,即使多个事务同时执行,它们也不应该相互影响。
  4. 持久性(Durability):一旦事务被提交,对数据库的修改将是永久性的,即使系统发生故障,修改的数据也不会丢失。

通过这些特性,事务确保了数据库的完整性和可靠性,从而保证了数据操作的有效性。

什么是分布式事务?

分布式系统是指将原来单体应用加数据库的应用工程拆分成多个子系统/服务,各个服务有自己独立的数据库。

分布式事务是指涉及多个独立的数据源或处理单元的事务操作。在分布式系统中,当涉及到多个数据库或服务时,需要确保跨这些不同单元的事务能够保持一致性、原子性、隔离性和持久性,这就是分布式事务的挑战所在。

传统的事务处理是在单个数据库中执行,可以很容易地保持事务的 ACID 特性(原子性、一致性、隔离性、持久性)。然而,当涉及到跨多个数据库或服务时,确保这些 ACID 特性就变得更加复杂。

分布式事务处理通常使用不同的技术和协议来确保事务的一致性,如两阶段提交(2PC)、三阶段提交(3PC)、TCC、补偿事务等。这些方法尝试解决分布式环境下数据一致性的问题,但每种方法都有其优缺点和适用场景。

分布式事务需要考虑网络延迟、节点故障、并发操作等问题,因此需要仔细设计和权衡来选择适合特定场景的解决方案。

人话

分布式事务就是确保多个地方的操作要么都成功,要么都失败,不会出现数据混乱不一致的情况。

应用场景

下面是一个示例,说明如何在分布式环境下执行转账操作:

假设有两个不同的银行(Bank A 和 Bank B),用户 A 在 Bank A 拥有账户,想要向用户 B 在 Bank B 的账户转账一定金额。

  1. 开始分布式事务
  • 用户 A 发起转账请求,Bank A 和 Bank B 启动一个分布式事务来管理整个转账过程。

    1. 验证和授权
  • Bank A 验证用户 A 的身份和转账金额的合法性。

  • Bank B 验证用户 B 的账户信息和转账请求的有效性。

    1. 扣款
  • Bank A 扣除用户 A 的转账金额。这个步骤需要在 Bank A 关联的数据库或服务中执行。

    1. 向另一银行存款
  • Bank B 向用户 B 的账户存入转账金额。这也可能需要与 Bank B 关联的数据库或服务进行交互。

    1. 更新转账状态
  • 在分布式事务中,记录转账操作的状态和详细信息,确保转账信息的可追溯和一致性。

    1. 确认和提交分布式转账事务
  • 确认扣款和存款操作都已成功完成,确保资金从一个银行转移到另一个银行。

  • 提交分布式事务,使得转账过程中的所有更改永久性地记录在系统中。

    1. 处理异常情况
  • 如果在转账过程中发生错误(例如某个银行服务不可用),必须回滚事务,撤销之前的操作,确保账户余额和转账状态的一致性。

在分布式环境中执行转账操作需要保证事务的一致性和可靠性。

在这个例子中,必须保证A账户扣款成功和B账户存款成功,或者失败后A、B账户余额不变,确保一致性和完整性是分布式事务需要解决的问题。

实现方式

二阶段提交

二阶段提交(Two-Phase Commit,2PC)是一种分布式系统中用于实现事务的协议。它保证了跨多个节点或系统的分布式事务要么全部提交成功,要么全部回滚失败,以确保数据的一致性。

这个协议包含两个阶段:

  1. 准备阶段:在这个阶段,协调者节点(Coordinator)询问所有参与者节点(Participants)是否可以提交事务。参与者节点会进行准备工作,并向协调者发送准备完成的消息。如果所有节点都准备就绪,协调者会进入第二个阶段。
  2. 提交阶段:在这个阶段,协调者节点根据参与者节点的反馈,决定是提交事务还是回滚事务。如果所有参与者都准备就绪,则协调者发送提交的消息,所有节点进行最终的操作;如果有任何一个参与者未准备好或出现问题,则协调者发送回滚的消息,所有节点回滚之前的操作。

二阶段提交协议确保了在分布式环境中的事务操作是原子性的,即要么全部成功,要么全部失败,以维持数据的一致性。

然而,它也存在一些问题

  1. 阻塞问题:在第二阶段的提交过程中,如果协调者节点出现故障或者长时间不可用,参与者节点将会一直等待,导致资源无法释放,造成系统性能下降或死锁等问题。
  2. 单点故障:协调者节点是这个协议的中心,如果协调者发生故障,整个协议可能会中断,导致无法进行提交或回滚。
  3. 数据不一致:即使在第二阶段中,协调者节点发送了提交的消息,但在消息发送后协调者宕机,这可能导致一些参与者节点收到了提交消息但无法得知最终结果,数据状态可能处于不一致状态。
  4. 性能问题:在协议的过程中,需要网络通信和多个节点之间的协调,这些额外的通信开销和等待时间可能会影响系统性能。

    TCC

TCC(Try-Confirm/Cancel)是一种分布式事务处理模式,相比传统的二阶段提交(2PC)等协议,它解决了一些分布式事务处理中的问题:

  1. 减少阻塞时间:相对于2PC,TCC在执行事务前会进行“尝试”阶段,这允许参与者节点在尝试阶段预留资源,而不是等待协调者节点,从而减少了整个事务期间的阻塞时间。
  2. 降低单点故障风险:TCC模式中,没有单个中心节点充当协调者,每个参与者都有自己的逻辑来执行事务,并在确认和取消阶段处理自己的资源状态,因此减少了单点故障的风险。
  3. 灵活性和可控性:TCC模式允许开发者更灵活地定义事务逻辑,可以在尝试阶段检查和准备资源,确保事务执行前的一致性,同时在确认或取消阶段根据业务需要进行相应的操作,提供了更高的可控性。
  4. 部分可用性:即使在部分节点或者服务不可用的情况下,TCC也可以使得系统部分可用,因为每个参与者负责自己的事务逻辑和资源释放,部分故障不会影响整个事务流程。

总体来说,TCC模式相对于传统的2PC协议,更适用于一些需要灵活性、降低单点故障风险、减少阻塞时间以及部分可用性的分布式事务场景。

  • 尝试(Try)阶段:在这个阶段,事务首先尝试执行所需的各项操作,但不对外部资源进行实际改变。它主要用于检查各项操作是否可以成功完成,预留必要的资源。
  • 确认(Confirm)阶段:如果尝试阶段成功,系统会执行确认阶段,此时将对外部资源进行实际的改变和操作。这确保了事务在执行后,对外部环境所做的操作是确定性的。
  • 取消(Cancel)阶段:如果在尝试阶段或确认阶段出现失败或者异常,系统会执行取消阶段,将之前预留的资源释放,确保不会对系统产生不一致性或者错误的影响。

TCC事务通过这三个阶段的协作,保证在分布式环境下的事务操作能够以可靠的方式执行,并能够有效地处理因各种异常情况而导致的事务失败或中断。

尽管TCC模式在处理分布式事务时提供了一些优势,但也存在一些问题和挑战:

  1. 实现复杂性:TCC需要开发者自行实现事务的尝试、确认和取消逻辑。这种自定义逻辑增加了系统的复杂性和开发成本,特别是在涉及多个参与者和复杂业务逻辑时更加显著。
  2. 数据一致性保障:在TCC模式中,需要开发者自行确保尝试阶段的资源预留和确认/取消阶段的一致性,可能存在一些问题,如部分资源释放或者阶段性的不一致性。
  3. 资源占用和超时处理:如果在尝试阶段预留资源后,确认或取消阶段长时间不执行,可能导致资源长时间被占用或者无法及时释放。因此,超时处理和资源管理是TCC模式需要考虑的重要问题。
  4. 分布式事务处理的复杂性:尽管TCC减少了阻塞时间和单点故障的风险,但在处理分布式事务时,仍需要处理网络分区、故障恢复、消息传递等方面的复杂性。

RocketMQ基于可靠消息的TCC事务实现

RocketMQ的事务实现,本质是一种基于可靠消息的最终一致性实现。

  • 尝试(Try/Prepare)阶段:在这个阶段,事务发起者向RocketMQ(充当协调者)发出Prepare消息(Haf-Message)以后,做本地数据变更,提交本地数据库事务,在这个阶段事务参与者无法感知。
  • 确认(Confirm/Commit)阶段:如果尝试阶段成功(Prepare消息成功+本地事务成功),事务发起者向RocketMQ发送commit命令提交消息,此时事务参与者将通过consume消息的形式感知到,进行相关的数据改变和操作,达到事务一致性。
  • 回滚(Cancel/Rollback)阶段:如果在尝试阶段失败或者异常,意味着发起者的本地事务已回滚,RocketMQ也不会提交消息,事务参与者由于无法consume消息,因此也不会对系统产生不一致性或者错误的影响。
  • 检查(Check)阶段:如果在Commit/Rollback阶段出现任何异常造成RocketMQ服务端没有收到Commit/Rollback命令,RocketMQ服务端向事务发起者传达Check命令,询问事务发起者的本地事务执行状态,事务发起者再次commit或者rollback消息,确保事务最终一致。

使用RocketMQ实现分布式系统需要业务上能接受两个缺陷:

  1. 子系统数据状态“短暂”的不一致,即事务发起者Prepare阶段至事务参与者consume消息之前的状态不一致。
  2. 事务发起者commit消息后,事务参与者必须确保自身的本地事务执行成功,或者通过其它补偿方式确保数据一致。总之无法在本次事务中撤消事务发起者的数据变更。

    实现原理

同步处理阶段

Prepare

  1. 消费者发送事务消息到服务端,此时的消息对于消费者而言是「不可见」状态。

这个过程被定义为prepare阶段。

消息被定义为Half-Message,或者Prepare-Message。

  1. Prepare-Message发送成功。

    Local-Transcation

  2. 消费者执行本地事务,比如一个数据库CRUD操作,本地事务依赖数据库确保一致性。

    Commit/Rollback

  3. 如果本地事务执行成功,消费者调用commit函数以提交上述Half-Message,此时的消息对于消费者而言变为「可见」状态。消费者处理子系统的本地事务

如果本地事务执行失败,消费者调用rollback函数回滚上述Half-Message,此时服务端将删除该消息,

消费者也不会消费到。

异步检查阶段

检查策略

在使用事务消息的过程中,可能会遇到如下异常场景,造成RocketMQ未收到commit或者rollback请求,RocketMQ不确定应该如何处理事务消息。

因此,RocketMQ通过调用消费者实现TransactionChecker的check函数,获取本地事务的确切状态,来决定消息是否让消费者「可见」。

阶段影响 生产者影响 消费者影响 处理策略
Prepare失败 本地事务未执行 消息不可见 * 无需处理
Local-Transcation失败 本地事务被回滚 消息不可见 RocketMQ回查事务状态 删除Half-Messsage
Commit异常,实际执行成功 本地事务已执行成功 消息可见 * 使消息可见
Commit异常,实际执行失败 本地事务已执行成功 消息暂时不可见 RocketMQ回查事务状态 使消息可见
Rollback异常,实际执行成功 本地事务已执行失败 消息不可见 * 删除消息
Rollback异常,实际执行失败 本地事务已执行失败 消息不可见 RocketMQ回查事务状态 删除消息

检查流程

  1. RocketMQ服务端后台线程不断的扫描「未确认」的Half-Message,远程请求Producer客户端,问询事务状态。
  2. Producer客户端调用TransactionChecker的check函数,获取TransactionResolution:COMMIT,ROLLBACK,check函数由事务消息发送者实现,供Producder调用,check函数应该如实地反馈本地事务执行的真实状态,以确保整个业务的一致性。
  3. Producer重新发送commit或者rollback请求。

代码示例

public static void main(String[] args) throws ClientException { 
    ClientServiceProvider provider = new ClientServiceProvider(); 
    MessageBuilder messageBuilder = new MessageBuilderImpl(); 
    //构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。 
    Producer producer = provider.newProducerBuilder() 
            .setTransactionChecker(checker) 
            .build(); 

    //开启事务分支。 
    final Transaction transaction; 
    try { 
        transaction = producer.beginTransaction(); 
    } catch (ClientException e) { 
        e.printStackTrace(); 
        //事务分支开启失败,直接退出。 
        return; 
    } 

    Message message = messageBuilder.setTopic("topic") 
            //设置消息索引键,可根据关键字精确查找某条消息。 
            .setKeys("messageKey") 
            //设置消息Tag,用于消费端根据指定Tag过滤消息。 
            .setTag("messageTag") 
            //一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。 
            .addProperty("OrderId", "xxx") 
            //消息体。 
            .setBody("messageBody".getBytes()) 
            .build(); 
    //发送半事务消息 
    final SendReceipt sendReceipt; 
    try { 
        sendReceipt = producer.send(message, transaction); 
    } catch (ClientException e) { 
        //半事务消息发送失败,事务可以直接退出并回滚。 
        return; 
    } 
    /** 
     * 执行本地事务,并确定本地事务结果。 
     * 1. 如果本地事务提交成功,则提交消息事务。 
     * 2. 如果本地事务提交失败,则回滚消息事务。 
     * 3. 如果本地事务未知异常,则不处理,等待事务消息回查。 
     * 
     */ 
    boolean localTransactionOk = doLocalTransaction(); 
    if (localTransactionOk) { 
        try { 
            transaction.commit(); 
        } catch (ClientException e) { 
            // 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。 
            e.printStackTrace(); 
        } 
    } else { 
        try { 
            transaction.rollback(); 
        } catch (ClientException e) { 
            // 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。 
            e.printStackTrace(); 
        } 
    } 
} 
//演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。 
private static boolean checkOrderById(String orderId) { 
    return true; 
} 
//演示demo,模拟本地事务的执行结果。 
private static boolean doLocalTransaction() { 
    return true; 
} 
//演示demo,本地事务检查器,它将由RocketMQ服务端触发回调执行 
private static TransactionChecker checker = new TransactionChecker() { 
    @Override 
    public TransactionResolution check(MessageView messageView) { 
        /** 
         * 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。 
         * 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。 
         */ 
        final String orderId = messageView.getProperties().get("OrderId"); 
        if (Strings.isNullOrEmpty(orderId)) { 
            // 错误的消息,直接返回Rollback。 
            return TransactionResolution.ROLLBACK; 
        } 
        return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK; 
    } 
};

客户端流程

核心对象分析

生产者

TransactionChcker

本地事务状态回查接口,由生产者实现它的唯一函数check。

职责是向Producer返回本地事务状态。

ProducerBuilderImpl

生产者对象构造器。

职责是构造、配置生产者。

对于事务消息而已,需要关注的函数是setTransactionChecker(TransactionChecker checker),通过该函数为Producer设置事务状态回查实现类。

ProducerImpl

生产者实现类。

职责是发送消息,本文只关注发送事务消息。

通过调用生产者的beginTransaction开启事务,获得Transaction对象。

onRecoverOrphanedTransactionCommand处理来自服务端的消息回查远程请求,它将会调用客户自定义实现的TransactionChcker的check函数获取本地事务结果后,调用endTransaction提交本地事务COMMIT/ROLLBACK

消费者

消费者消费事务消息与普通消息没有区别。

本文不作详细说明。

核心流程分析

开启事务

消息生产者调用Producer的beginTransaction函数开启事务。

该函数主要的功能是创建一个新的TransactionImpl对象,关联Producer自己。

@Override 
public Transaction beginTransaction() { 
    checkNotNull(checker, "Transaction checker should not be null"); 
    if (!this.isRunning()) { 
        log.error("Unable to begin a transaction because producer is not running, state={}, clientId={}", 
            this.state(), clientId); 
        throw new IllegalStateException("Producer is not running now"); 
    } 
    return new TransactionImpl(this); 
}

TransactionImpl实现了Transaction接口。

TransactionImpl和ProducerImpl是一对一的关系。

class TransactionImpl implements Transaction { 
    //当前的实现一个事务只支持发送一条消息 
  //但是,为什么messages用Set保存呢? 
    private static final int MAX_MESSAGE_NUM = 1; 
    private final ProducerImpl producerImpl; 
    @GuardedBy("messagesLock") 
    private final Set<PublishingMessageImpl> messages; 
    private final ReadWriteLock messagesLock; 
    private final ConcurrentMap<PublishingMessageImpl, SendReceiptImpl> messageSendReceiptMap; 
    public TransactionImpl(ProducerImpl producerImpl) { 
        this.producerImpl = producerImpl; 
        this.messages = new HashSet<>(); 
        this.messagesLock = new ReentrantReadWriteLock(); 
        this.messageSendReceiptMap = new ConcurrentHashMap<>(); 
    } 
}

发送半消息

ProducedrImpl的send重载函数提供发送事务消息的功能。

发送事务消息时,需要带上刚才创建的transaction对象。

在发送半消息前,把消息放入transaction对象的Set保存。

在发送半消息后,把消息和Receipt关联,待后续commit or rollback时使用。

Half-Message发送成功后,消费者还无法消费到该消息,因为它还没有被确认。

后续commit流程和服务端流程会讲到。

@Override 
public SendReceipt send(Message message, Transaction transaction) throws ClientException { 
    if (!(transaction instanceof TransactionImpl)) { 
        throw new IllegalArgumentException("Failed downcasting for transaction"); 
    } 
    TransactionImpl transactionImpl = (TransactionImpl) transaction; 
    final PublishingMessageImpl publishingMessage; 
    try { 
        publishingMessage = transactionImpl.tryAddMessage(message); 
    } catch (Throwable t) { 
        throw new ClientException(t); 
    } 
    final ListenableFuture<List<SendReceiptImpl>> future = send(Collections.singletonList(publishingMessage), true); 
    final List<SendReceiptImpl> receipts = handleClientFuture(future); 
    final SendReceiptImpl sendReceipt = receipts.iterator().next(); 
    ((TransactionImpl) transaction).tryAddReceipt(publishingMessage, sendReceipt); 
    return sendReceipt; 
}

通过读写锁安全的把消息放入Set集合

public PublishingMessageImpl tryAddMessage(Message message) throws IOException { 
    messagesLock.readLock().lock(); 
    try { 
        if (messages.size() >= MAX_MESSAGE_NUM) { 
            throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " + 
                MAX_MESSAGE_NUM); 
        } 
    } finally { 
        messagesLock.readLock().unlock(); 
    } 
    messagesLock.writeLock().lock(); 
    try { 
        if (messages.size() >= MAX_MESSAGE_NUM) { 
            throw new IllegalArgumentException("Message in transaction has exceeded the threshold: " + 
                MAX_MESSAGE_NUM); 
        } 
        final PublishingMessageImpl publishingMessage = new PublishingMessageImpl(message, 
            producerImpl.publishingSettings, true); 
        messages.add(publishingMessage); 
        return publishingMessage; 
    } finally { 
        messagesLock.writeLock().unlock(); 
    } 
}

把消息和Receipt放入Map关联,待后续commit or rollback时使用。

public void tryAddReceipt(PublishingMessageImpl publishingMessage, SendReceiptImpl sendReceipt) { 
    messagesLock.readLock().lock(); 
    try { 
        if (!messages.contains(publishingMessage)) { 
            throw new IllegalArgumentException("Message not in transaction"); 
        } 
        messageSendReceiptMap.put(publishingMessage, sendReceipt); 
    } finally { 
        messagesLock.readLock().unlock(); 
    } 
}

commit消息

只有在生产者应用程序的本地事务执行成功时,才应该显示的调用commit函数提交上述Half-Message,将消息的状态变为「可见」,当commit成功后,消费者们可以消费到消息。

commit成功后,服务端是如何让消息变为可见的,后续服务端流程会讲到。

commit函数通过调用producerImpl的endTransaction函数提交本地事务状态:COMMIT。

@Override 
public void commit() throws ClientException { 
    if (messageSendReceiptMap.isEmpty()) { 
        throw new IllegalStateException("Transactional message has not been sent yet"); 
    } 
    for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) { 
        final PublishingMessageImpl publishingMessage = entry.getKey(); 
        final SendReceiptImpl sendReceipt = entry.getValue(); 
        producerImpl.endTransaction(sendReceipt.getEndpoints(), new GeneralMessageImpl(publishingMessage), 
            sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.COMMIT); 
    } 
}

endTransaction通过api向服务端发送消息确认请求。

public void endTransaction(Endpoints endpoints, GeneralMessage generalMessage, MessageId messageId, 
    String transactionId, final TransactionResolution resolution) throws ClientException { 
    final EndTransactionRequest.Builder builder = EndTransactionRequest.newBuilder() 
        .setMessageId(messageId.toString()).setTransactionId(transactionId) 
        .setTopic(apache.rocketmq.v2.Resource.newBuilder().setName(generalMessage.getTopic()).build()); 
    switch (resolution) { 
        case COMMIT: 
            builder.setResolution(apache.rocketmq.v2.TransactionResolution.COMMIT); 
            break; 
        case ROLLBACK: 
        default: 
            builder.setResolution(apache.rocketmq.v2.TransactionResolution.ROLLBACK); 
    } 
    final Duration requestTimeout = clientConfiguration.getRequestTimeout(); 
    final EndTransactionRequest request = builder.build(); 
    final List<GeneralMessage> generalMessages = Collections.singletonList(generalMessage); 
    MessageHookPoints messageHookPoints = TransactionResolution.COMMIT.equals(resolution) ? 
        MessageHookPoints.COMMIT_TRANSACTION : MessageHookPoints.ROLLBACK_TRANSACTION; 
    final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(messageHookPoints); 
    doBefore(context, generalMessages); 
    final RpcFuture<EndTransactionRequest, EndTransactionResponse> future = 
        this.getClientManager().endTransaction(endpoints, request, requestTimeout); 
    Futures.addCallback(future, new FutureCallback<EndTransactionResponse>() { 
        @Override 
        public void onSuccess(EndTransactionResponse response) { 
            final Status status = response.getStatus(); 
            final Code code = status.getCode(); 
            MessageHookPointsStatus hookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK : 
                MessageHookPointsStatus.ERROR; 
            final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context, 
                hookPointsStatus); 
            doAfter(context0, generalMessages); 
        } 
        @Override 
        public void onFailure(Throwable t) { 
            final MessageInterceptorContextImpl context0 = new MessageInterceptorContextImpl(context, 
                MessageHookPointsStatus.ERROR); 
            doAfter(context0, generalMessages); 
        } 
    }, MoreExecutors.directExecutor()); 
    final EndTransactionResponse response = handleClientFuture(future); 
    final Status status = response.getStatus(); 
    final Code code = status.getCode(); 
    if (!Code.OK.equals(code)) { 
        throw new ClientException(code.getNumber(), future.getContext().getRequestId(), status.getMessage()); 
    } 
}

rollback消息

只有在生产者应用程序的本地事务执行失败时,才应该显示的调用rollback函数回滚上述Half-Message。

rollback后的消息,对消费而言依然是不可见的。

rollback后的Half-Message在服务端会被标记为已删除,便于服务端更快速的找到状态待确认的Half-Message,向客户端发送事务状态检查请求。

rollback函数与commit在逻辑上类似,区别是状态为ROLLBACK

@Override 
public void rollback() throws ClientException { 
    if (messageSendReceiptMap.isEmpty()) { 
        throw new IllegalStateException("Transactional message has not been sent yet"); 
    } 
    for (Map.Entry<PublishingMessageImpl, SendReceiptImpl> entry : messageSendReceiptMap.entrySet()) { 
        final PublishingMessageImpl publishingMessage = entry.getKey(); 
        final SendReceiptImpl sendReceipt = entry.getValue(); 
        producerImpl.endTransaction(sendReceipt.getEndpoints(), new GeneralMessageImpl(publishingMessage), 
            sendReceipt.getMessageId(), sendReceipt.getTransactionId(), TransactionResolution.ROLLBACK); 
    } 
}

事务回查

为什么要做事务回查?

因为服务端因为各种原因并没有收到commit/rollback确认请求,比如:网络错误,程序Bug等因素。

此时服务端不知道应该如何处置前述Half-Message。

因此需要服务端主动向生产者发起「事务回查」远程请求,用来获取本地事务的状态,从而决定是投递消息,还是删除消息。

示意图

如上图所示,事务回查由服务端发起rpc请求生产者客户端,由Producer接收回查请求,Producer再调用客户自定义实现的TransactionCheker.check获取事务状态。

值得注意的是,事务回查请求不会同步返回事务处理状态,而是由Producer发起新的rpc请求,报告事务状态,其提交逻辑如同客户主动触发commit/rollback一样,这种方式使得服务端职责更清晰——回查请求仅仅是触发回查,事务状态提交还是复用此前逻辑。

ProducerImpl处理事务回查命令。

public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command) { 
    final String transactionId = command.getTransactionId(); 
    final String messageId = command.getMessage().getSystemProperties().getMessageId(); 
    if (null == checker) { 
        log.error("No transaction checker registered, ignore it, messageId={}, transactionId={}, endpoints={}," 
            + " clientId={}", messageId, transactionId, endpoints, clientId); 
        return; 
    } 
    MessageViewImpl messageView; 
    try { 
        messageView = MessageViewImpl.fromProtobuf(command.getMessage()); 
    } catch (Throwable t) { 
        log.error("[Bug] Failed to decode message during orphaned transaction message recovery, messageId={}, " 
            + "transactionId={}, endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t); 
        return; 
    } 
    ListenableFuture<TransactionResolution> future; 
    try { 
        final ListeningExecutorService service = MoreExecutors.listeningDecorator(telemetryCommandExecutor); 
        //调用TransactionChecker.check查询本地事务状态.        
        final Callable<TransactionResolution> task = () -> checker.check(messageView); 
        future = service.submit(task); 
    } catch (Throwable t) { 
        final SettableFuture<TransactionResolution> future0 = SettableFuture.create(); 
        future0.setException(t); 
        future = future0; 
    } 
    Futures.addCallback(future, new FutureCallback<TransactionResolution>() { 
        @Override 
        public void onSuccess(TransactionResolution resolution) { 
            try { 
                if (null == resolution || TransactionResolution.UNKNOWN.equals(resolution)) { 
                    return; 
                } 
                final GeneralMessage generalMessage = new GeneralMessageImpl(messageView); 
                //提交事务状态resolution(COMMIT/ROLLBACK) 
                endTransaction(endpoints, generalMessage, messageView.getMessageId(), 
                    transactionId, resolution); 
            } catch (Throwable t) { 
                log.error("Exception raised while ending the transaction, messageId={}, transactionId={}, " 
                    + "endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t); 
            } 
        } 
        @Override 
        public void onFailure(Throwable t) { 
            log.error("Exception raised while checking the transaction, messageId={}, transactionId={}, " 
                + "endpoints={}, clientId={}", messageId, transactionId, endpoints, clientId, t); 
        } 
    }, MoreExecutors.directExecutor()); 
}

服务端流程

核心对象分析

Proxy启动服务

Proxy向客户端提供两种远程服务:兼容4.0版本的Netty服务,Grpc服务。

最终都会汇聚依赖MessageProcessor接口处理。

ProxyStartup

Proxy模块的启动类,main函数所在。

RemotingProtocolServer

Netty远程服务,兼容4.0版本的客户端。

由SendMessageActivity、TransactionActivity等消息处理类。

本文重点关注事务消息相关的活动,故其它Activity不在此列出,后文不再赘述。

SendMessageActivity

发送消息。有两个同名类,分别在grpc和remoting包下有各自的实现。

负责消息协议适配后,通过MessageProcessor接口转发。

TransactionActivity

处理事务提交。在remoting包下,兼容4.0版本的请求

EndTransactionActivity

处理事务提交。在grpc包下,功能与TransactionActivity相同。

后文将以Grpc实现分析,Netty实现的逻辑类似,不再重复说明。

Proxy转发消息

MessageProcessor

Proxy消息处理接口

DefaultMessagingProcessor

MessageProcessor接口的实现类

ProducerProcessor

处理send请求

TransactionProcessor

处理endTransaction请求

MessageService接口

Proxy和Broker消息交互服务接口,有两个实现类:LocalMessageService、ClusterMessageService

LocalMessageService

Proxy和Broker部署在同一个进程时使用Local实现类,Proxy同进程调用Broker。

ClusterMessageService

Proxy和Broker分开部署时使用Cluster实现类,Proxy远程调用Broker。

ProxyClientRemotingProcessor

处理服务端向客户端发送事务状态检查rpc请求。

GrpcClientChannel

继承了ProxyChannel,负责向GrpcClient发送事务回查指令。

Broker处理消息

SendMessageProcessor

处理发送消息请求,当消息类型是事务消息类型时,调用TransactionService的prepareMessage函数,临时保存为不可见的Half-Message或者叫Prepare-Message。

EndTransactionProcessor

处理commit或rollback请求已结束事务。

处理完毕后,上述Half-Message将变为对消费者可见(commit)或者被删除(rollback)。

TransactionService

负责处理事务消息。提供:保存临时消息、提交消息、回滚消息、删除临时消息、事务检查等能力。

TransactionMessageCheckService

事务检查后台线程。

默认每隔30秒触发扫描、检查任务。

它负责找出未确认(commit/rollback)的消息,向生产者发送rpc事务检查请求。

具体如何筛选出未确认的消息,后文会详细说明。

AbstractTransactionMessageCheckListener

通过线程池异步发起回查任务。

DefaultTransactionMessageCheckListener

处理当回查次数达到最大值后,如何处置即将丢弃的消息。

TransactionalOpBatchService

核心流程分析

Proxy服务端

职责分析

proxy的核心职责是转发事务消息。

源码解读

转发Half-Message

SendMessageActivity接收客户端的grpc请求,对消息属性做一次转换后发送到Broker处理。

它解决了Broker层如何识别这是一条事务消息的问题。

public CompletableFuture<SendMessageResponse> sendMessage(ProxyContext ctx, SendMessageRequest request) { 
    CompletableFuture<SendMessageResponse> future = new CompletableFuture<>(); 
    try { 
        if (request.getMessagesCount() <= 0) { 
            throw new GrpcProxyException(Code.MESSAGE_CORRUPTED, "no message to send"); 
        } 
        List<apache.rocketmq.v2.Message> messageList = request.getMessagesList(); 
        apache.rocketmq.v2.Message message = messageList.get(0); 
        Resource topic = message.getTopic(); 
        validateTopic(topic); 
        future = this.messagingProcessor.sendMessage( 
            ctx, 
            new SendMessageQueueSelector(request), 
            GrpcConverter.getInstance().wrapResourceWithNamespace(topic), 
            buildSysFlag(message), 
            //构建Broker能识别的消息,协议转换 
            buildMessage(ctx, request.getMessagesList(), topic) 
        ).thenApply(result -> convertToSendMessageResponse(ctx, request, result)); 
    } catch (Throwable t) { 
        future.completeExceptionally(t); 
    } 
    return future; 
}

当MessageType=TRANSACTION,

添加MessageConst.PROPERTY_TRANSACTION_PREPARED属性true。

protected Message buildMessage(ProxyContext context, apache.rocketmq.v2.Message protoMessage, String producerGroup) { 
    String topicName = GrpcConverter.getInstance().wrapResourceWithNamespace(protoMessage.getTopic()); 
    validateMessageBodySize(protoMessage.getBody()); 
    Message messageExt = new Message(); 
    messageExt.setTopic(topicName); 
    messageExt.setBody(protoMessage.getBody().toByteArray()); 
    Map<String, String> messageProperty = this.buildMessageProperty(context, protoMessage, producerGroup); 
    MessageAccessor.setProperties(messageExt, messageProperty); 
    return messageExt; 
} 
protected Map<String, String> buildMessageProperty(ProxyContext context, apache.rocketmq.v2.Message message, String producerGroup) { 
    long userPropertySize = 0; 
    ProxyConfig config = ConfigurationManager.getProxyConfig(); 
    org.apache.rocketmq.common.message.Message messageWithHeader = new org.apache.rocketmq.common.message.Message(); 
    // set user properties 
    Map<String, String> userProperties = message.getUserPropertiesMap(); 
    if (userProperties.size() > config.getUserPropertyMaxNum()) { 
        throw new GrpcProxyException(Code.MESSAGE_PROPERTIES_TOO_LARGE, "too many user properties, max is " + config.getUserPropertyMaxNum()); 
    } 

  //... 
    //...省略N行不需要关注的代码... 
    //... 

    // set transaction property 
    MessageType messageType = message.getSystemProperties().getMessageType(); 
    if (messageType.equals(MessageType.TRANSACTION)) { 
        //如果消息类型是事务消息 
        //设置消息头PROPERTY_TRANSACTION_PREPARED属性,Broker根据该属于值判定是事务消息 
        MessageAccessor.putProperty(messageWithHeader, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); 
        if (message.getSystemProperties().hasOrphanedTransactionRecoveryDuration()) { 
            long transactionRecoverySecond = Durations.toSeconds(message.getSystemProperties().getOrphanedTransactionRecoveryDuration()); 
            validateTransactionRecoverySecond(transactionRecoverySecond); 
            MessageAccessor.putProperty(messageWithHeader, MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, 
                String.valueOf(transactionRecoverySecond)); 
        } 
    } 
    return messageWithHeader.getProperties(); 
} 
protected int buildSysFlag(apache.rocketmq.v2.Message protoMessage) { 
    // sysFlag (body encoding & message type) 
    int sysFlag = 0; 
    Encoding bodyEncoding = protoMessage.getSystemProperties().getBodyEncoding(); 
    if (bodyEncoding.equals(Encoding.GZIP)) { 
        sysFlag |= MessageSysFlag.COMPRESSED_FLAG; 
    } 
    // transaction 
    MessageType messageType = protoMessage.getSystemProperties().getMessageType(); 
    if (messageType.equals(MessageType.TRANSACTION)) { 
        //sysFlag设置为TRANSACTION_PREPARED_TYPE 
        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; 
    } 
    return sysFlag; 
}
转发endTransaction

通过请求头识别commit或者rollback,调用commit或者rollback接口

@Override 
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { 
    final RemotingCommand response = RemotingCommand.createResponseCommand(null); 
    final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); 
    LOGGER.debug("Transaction request:{}", requestHeader); 
    if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) { 
        response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); 
        LOGGER.warn("Message store is slave mode, so end transaction is forbidden. "); 
        return response; 
    } 
    if (!checkTransactionState(ctx, request, requestHeader)) { 
        return null; 
    } 
    OperationResult result = new OperationResult(); 
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) { 
        //处理commit请求 
        return processCommitRequest(requestHeader, response); 
    } 
    if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) { 
        //处理rollback请求 
        return processRollbackRequest(requestHeader, response); 
    } 
    return response.setCodeAndRemark(result.getResponseCode(), result.getResponseRemark()); 
}
转发checkTransactionState

ProxyClientRemotingProcessor从producerManager找出生产者网络channel,调用了channel的writeAndFlush发出rpc请求。

@Override 
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, 
    RemotingCommand request) throws RemotingCommandException { 
    final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody()); 
    final MessageExt messageExt = MessageDecoder.decode(byteBuffer, true, false, false); 
    if (messageExt != null) { 
        final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); 
        if (group != null) { 
            CheckTransactionStateRequestHeader requestHeader = 
                (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class); 
            request.writeCustomHeader(requestHeader); 
            request.addExtField(ProxyUtils.BROKER_ADDR, NetworkUtil.socketAddress2String(ctx.channel().remoteAddress())); 
            Channel channel = this.producerManager.getAvailableChannel(group); 
            if (channel != null) { 
                channel.writeAndFlush(request); 
            } else { 
                log.warn("check transaction failed, channel is empty. groupId={}, requestHeader:{}", group, requestHeader); 
            } 
        } 
    } 
    return null; 
}

ProxyChannel writeAndFlush做了回查指令兼容

public ChannelFuture writeAndFlush(Object msg) { 
    CompletableFuture<Void> processFuture = new CompletableFuture<>(); 
  switch (command.getCode()) { 
        //...     
        //此处省略若干行与事务消息无关的代码 
        //... 

        case RequestCode.CHECK_TRANSACTION_STATE: { 
            CheckTransactionStateRequestHeader header = (CheckTransactionStateRequestHeader) command.readCustomHeader(); 
            MessageExt messageExt = MessageDecoder.decode(ByteBuffer.wrap(command.getBody()), true, false, false); 
            RelayData<TransactionData, Void> relayData = this.proxyRelayService.processCheckTransactionState(context, command, header, messageExt); 
            processFuture = this.processCheckTransaction(header, messageExt, relayData.getProcessResult(), relayData.getRelayFuture()); 
            break; 
        } 
        default: 
            break;    
    } 
}

GrpcClientChannel

设置RecoverOrphanedTransactionCommand。

与Producer.onRecoverOrphanedTransactionCommand相对应。

@Override 
protected CompletableFuture<Void> processCheckTransaction(CheckTransactionStateRequestHeader header, 
    MessageExt messageExt, TransactionData transactionData, CompletableFuture<ProxyRelayResult<Void>> responseFuture) { 
    CompletableFuture<Void> writeFuture = new CompletableFuture<>(); 
    try { 
        this.writeTelemetryCommand(TelemetryCommand.newBuilder() 
            //设置事务回查指令,5.0版本的指令名称与4.0不同。 
            //与生产者客户端的onRecoverOrphanedTransactionCommand对应                     
            .setRecoverOrphanedTransactionCommand(RecoverOrphanedTransactionCommand.newBuilder() 
                .setTransactionId(transactionData.getTransactionId()) 
                .setMessage(GrpcConverter.getInstance().buildMessage(messageExt)) 
                .build()) 
            .build()); 
        responseFuture.complete(null); 
        writeFuture.complete(null); 
    } catch (Throwable t) { 
        responseFuture.completeExceptionally(t); 
        writeFuture.completeExceptionally(t); 
    } 
    return writeFuture; 
}

Broker服务端

设计思想

Brokder在处理事务消息时,需要解决如下三个问题?

  1. 如何确保消息在commit之前对消费者不可见?
  2. 未确认的Half-Message如何处理?
  3. 怎么找到未确认的Half-Message?
    prepare(half-message)处理

事务消息发送到Broker后,Broker在保存前修改消息的Topic名称为:HALF_TOPIC。

为消息添加属性:REAL_TOPIC,指向它原本的Topic。

消息仅被保存到系统内部:HALF_TOPIC。

如下图所示:

commit处理

当收到生产者发来的commit请求时:

从HALF_TOPIC取出Message。

写入一份消息到REAL_TOPIC,此时消息可被消费了。

写入一份消息到OP_HALF_TOPIC,标记消息已被处理了。

写入OP_HALF_TOPIC操作是异步的,批量的(示意图)。

每处理4096次commit/rollback请求才会写入一条OP_Message,或者每隔3秒也会写入一条OP_Message。

每条OP_Message的body中记录了多条Message的offset,以逗号分隔。

每条OP_Message会打上REMOVE_TAG。

下图所示的Op_Message此时可能还没有put进OP_HALF_TOPIC,需要累积更多条commit才会执行put。

为什么要把处理过的消息offset写入OP_HALF_TOPIC ?

因为RocketMQ顺序追回写入,消息不允许修改、删除,那样就成了随机写,性能差。

但是,后台线程发起事务回查时,需要识别出哪些消息未被commit/rollback,因此当消息被commit/rollback时,重新写入一份到OP_HALF_TOPIC(Op_Message仅保存Message的offset)。

rollback处理

当收到生产者发来的rollback请求时:

rollback时不把消息放回BIZ_TOPIC_A,保持不可见,标记删除。

消息检查

后台事务回查服务开始扫描,找出需要回查的消息,异步发起事务回查rpc请求客户端。

对于达到最大事务回查次数,依然未确认的消息,放入TRANS_CHECK_MAX_TIME_TOPIC

事务消息生命周期

状态对比表
阶段 内部存储状态 对消费者状态
Prepared HALF_TOPIC 不可消费
Commited HALF_TOPIC OP_HALF_TOPIC 可消费
Rollbacked HALF_TOPIC OP_HALF_TOPIC 不可消费
Discarded HALF_TOPIC CHECK_MAX_TIME_TOPIC 不可消费
状态变迁

源码解读

消费时检查,不消费系统内部Topic

RocketMQ使用内部Topic「RMQ_SYS_TRANS_HALF_TOPIC」存储生产者发送的Half-Message。

PushConsumerImpl在启动时扫描调用Proxy的queryAssignment时,会对请求的Topic检验,不允许通过常规方式访问系统内部Topic。

public void validateTopic(String topicName) { 
    try { 
        Validators.checkTopic(topicName); 
    } catch (MQClientException mqClientException) { 
        throw new GrpcProxyException(Code.ILLEGAL_TOPIC, mqClientException.getErrorMessage()); 
    } 
    if (TopicValidator.isSystemTopic(topicName)) { 
        throw new GrpcProxyException(Code.ILLEGAL_TOPIC, "cannot access system topic"); 
    } 
}
保存消息时,不构建ConsumeQueue

RocketMQ文件系统回顾:

  • CommitLog

全局一个文件,所有消息顺序写入CommitLog文件

  • ConsumeQueue

同一个Topic,多个ConsumeQueue文件,每条记录包含CommitLog的offset信息。通过offset可直接在CommitLog找到原始消息。

  • IndexFile

提供按Key和时间查找消息的能力。

保存消息时先通过DefaultMessageStore顺序写CommitLog文件。

ReputMessageService异步构建ConsumeQueue时,不会为事务消息构建ConsumeQueue。

值得注意的是,需要为事务消息构建IndexFile,否则系统内部也无法按时间、Topic快速查找Half-Message。

ReputMessageService异步构建ConsumeQueue代码。

//后台线程,一直运行 
public void run() { 
    while (!this.isStopped()) { 
        try { 
            TimeUnit.MILLISECONDS.sleep(1); 
            this.doReput(); 
        } catch (Exception e) { 
            LOGGER.warn(this.getServiceName() + " service has exception. ", e); 
        } 
    } 
} 
private boolean doReput(boolean doNext, SelectMappedBufferResult result) { 
    //... 
    //忽略若干行与事务消息分析无关的代码 
    //... 
    for (int readSize = 0; readSize < result.getSize() && reputFromOffset < messageStore.getConfirmOffset() && doNext; ) {   
        DispatchRequest dispatchRequest = messageStore.getCommitLog().checkMessageAndReturnSize(result.getByteBuffer(), false, false, false);       
        if (dispatchRequest.isSuccess()) { 
            //1. 处理消息分发 
            readSize = handleSuccessDispatchRequest(readSize, size, result, dispatchRequest); 
        } 
    } 
} 
private int handleSuccessDispatchRequest(int readSize, int size, SelectMappedBufferResult result, DispatchRequest dispatchRequest) throws RocksDBException { 
    if (size < 0) { 
        return readSize; 
    } 
    if (size == 0) { 
        this.reputFromOffset = messageStore.getCommitLog().rollNextFile(this.reputFromOffset); 
        readSize = result.getSize(); 
        return readSize; 
    } 
    //2. 调用messageStore的doDispatch函数 
    messageStore.doDispatch(dispatchRequest); 

    invokeArrivingListener(dispatchRequest); 
    this.reputFromOffset += size; 
    readSize += size; 
    addDispatchCount(dispatchRequest); 
    return readSize; 
}

ReputMessageService后台线程调用DefaultMessageStore.doDispatch函数

public void doDispatch(DispatchRequest req) throws RocksDBException { 
    //循环触发dispatch 
    for (CommitLogDispatcher dispatcher : this.dispatcherList) { 
        dispatcher.dispatch(req); 
    } 
}

CommitLogDispatcherBuildConsumeQueue负责构建ConsumeQueue,当消费类型是Prepared、Rollback时忽略,不为它们构建消费队列,消费者自然也无从消费了。

@Override 
public void dispatch(DispatchRequest request) throws RocksDBException { 
    final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag()); 
    switch (tranType) { 
        case MessageSysFlag.TRANSACTION_NOT_TYPE: 
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE: 
            consumeQueueStore.putMessagePositionInfoWrapper(request); 
            break; 
        //忽略Prepared,Rollback     
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE: 
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: 
            break; 
    } 
}

CommitLogDispatcherBuildIndex构建消息Index。

@Override 
public void dispatch(DispatchRequest request) { 
    if (messageStore.getMessageStoreConfig().isMessageIndexEnable()) { 
        messageStore.getIndexService().buildIndex(request); 
    } 
}

IndexService为事务消息构建索引,便于系统内部查询。

public void buildIndex(DispatchRequest req) { 
    IndexFile indexFile = retryGetAndCreateIndexFile(); 
    if (indexFile == null) { 
        LOGGER.error("build index error, stop building index"); 
        return; 
    } 
    long endPhyOffset = indexFile.getEndPhyOffset(); 
    DispatchRequest msg = req; 
    String topic = msg.getTopic(); 
    String keys = msg.getKeys(); 
    if (msg.getCommitLogOffset() < endPhyOffset) { 
        return; 
    } 
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); 
    switch (tranType) { 
        //注意,PREPARED_TYPE消息必须也要构建索引,否则内部也无法查询     
        case MessageSysFlag.TRANSACTION_NOT_TYPE: 
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE: 
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE: 
            break; 
        //只忽略rollback类型的消息     
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: 
            return; 
    } 
  //... 
  //忽略若干与事务消息分析无关代码 
  //.... 
  //构建和保存index 
}
处理commit
  • 取出Half_Message
  • 校验
  • 还原消息属性,存入原业务Topic
  • 删除Half_Message(通过写入到OP_HALF_TOPIC的方式做删除标识)

    private RemotingCommand processCommitRequest(EndTransactionRequestHeader requestHeader, RemotingCommand response) { 
    
    //取出Half_Message 
    OperationResult result = this.brokerController.getBrokerMessageService() 
                  .getTransactionalMessageService() 
    //commitMessage命名不恰当,其实只是get消息 
                  .commitMessage(requestHeader); 
    
    if (result.getResponseCode() != ResponseCode.SUCCESS) { 
        return response.setCodeAndRemark(result.getResponseCode(), result.getResponseRemark()); 
    } 
    if (rejectCommitOrRollback(requestHeader, result.getPrepareMessage())) { 
        response.setCode(ResponseCode.ILLEGAL_OPERATION); 
        LOGGER.warn("Message commit fail [producer end]. currentTimeMillis - bornTime > checkImmunityTime, msgId={},commitLogOffset={}, wait check", 
            requestHeader.getMsgId(), requestHeader.getCommitLogOffset()); 
        return response; 
    } 
    RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); 
    if (res.getCode() != ResponseCode.SUCCESS) { 
        return res; 
    } 
    //消息还原,Topic改为实际的业务Topic,清除一些临时属性 
    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage()); 
    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback())); 
    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset()); 
    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset()); 
    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp()); 
    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED); 
    //消息放回实际的Topic 
    RemotingCommand sendResult = sendFinalMessage(msgInner); 
    
    if (sendResult.getCode() == ResponseCode.SUCCESS) { 
        //消息放入OP_HALF_TOPIC 
        this.brokerController.getBrokerMessageService() 
                  .getTransactionalMessageService() 
        /* 
            delete语义有待商榷 
            RocketMQ无法删除消息,实际是放入OP_HALF_TOPIC做标识 
            便于事务回查时排除已commit的消息 
            可以理解成逻辑上删除PrepareMessage 
        */ 
    .deletePrepareMessage(result.getPrepareMessage()); 
    } 
    return sendResult; 
    }
    处理rollback

逻辑与commit类似,只是少了把消息放回原Topic的操作,rollback嘛,不能让消息可见。

private RemotingCommand processRollbackRequest(EndTransactionRequestHeader requestHeader, RemotingCommand response) { 

    //取出Half_Message 
    OperationResult result = this.brokerController.getBrokerMessageService() 
                                .getTransactionalMessageService() 
    //rollbackMessage命名不恰当,其实只是get消息 
                                .rollbackMessage(requestHeader); 
    if (result.getResponseCode() != ResponseCode.SUCCESS) { 
        return response.setCodeAndRemark(result.getResponseCode(), result.getResponseRemark()); 
    } 
    if (rejectCommitOrRollback(requestHeader, result.getPrepareMessage())) { 
        response.setCode(ResponseCode.ILLEGAL_OPERATION); 
        LOGGER.warn("Message rollback fail [producer end]. currentTimeMillis - bornTime > checkImmunityTime, msgId={},commitLogOffset={}, wait check", 
            requestHeader.getMsgId(), requestHeader.getCommitLogOffset()); 
        return response; 
    } 
    RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader); 
    if (res.getCode() == ResponseCode.SUCCESS) { 
       //消息放入OP_HALF_TOPIC 
        this.brokerController.getBrokerMessageService() 
                  .getTransactionalMessageService() 
        /* 
            delete语义有待商榷 
            RocketMQ无法删除消息,实际是放入OP_HALF_TOPIC做标识 
            便于事务回查时排除已commit的消息 
            可以理解成逻辑上删除PrepareMessage 
        */ 
                  .deletePrepareMessage(result.getPrepareMessage()); 
    } 
    return res; 
}
批量标识删除Half_Message

deletePrepareMessage实际是异步的,只是把需要标识删除的消息offset放入内存LinkedBlockingQueue。

当数量达到4096个时,唤醒后台线程,把4096个Half_Message的offset保存为一条OP_Message。

@Override 
public boolean deletePrepareMessage(MessageExt messageExt) { 
    Integer queueId = messageExt.getQueueId(); 
    MessageQueueOpContext mqContext = deleteContext.get(queueId); 
    if (mqContext == null) { 
        mqContext = new MessageQueueOpContext(System.currentTimeMillis(), 20000); 
        MessageQueueOpContext old = deleteContext.putIfAbsent(queueId, mqContext); 
        if (old != null) { 
            mqContext = old; 
        } 
    } 
    /* 
        OP_Message的body是Half_Message的offset 
        每条Half_Message批量保存多个offset,以逗号分隔 
        默认4096个offset 
     */ 
    String data = messageExt.getQueueOffset() + TransactionalMessageUtil.OFFSET_SEPARATOR; 

  try { 
        //提交到内存 
        boolean res = mqContext.getContextQueue().offer(data, 100, TimeUnit.MILLISECONDS); 
        if (res) { 
            int totalSize = mqContext.getTotalSize().addAndGet(data.length()); 
            if (totalSize > transactionalMessageBridge.getBrokerController().getBrokerConfig().getTransactionOpMsgMaxSize()) { 
                //数据达到4096个,唤醒线程,保存到一条OP_Message 
                this.transactionalOpBatchService.wakeup(); 
            } 
            return true; 
        } else { 
            this.transactionalOpBatchService.wakeup(); 
        } 
    } catch (InterruptedException ignore) { 
    } 
  //前述逻辑出现异常,函数未返回,强制同步保存一次。 
    Message msg = getOpMessage(queueId, data); 
    if (this.transactionalMessageBridge.writeOp(queueId, msg)) { 
        log.warn("Force add remove op data. queueId={}", queueId); 
        return true; 
    } else { 
        log.error("Transaction op message write failed. messageId is {}, queueId is {}", messageExt.getMsgId(), messageExt.getQueueId()); 
        return false; 
    } 
}
事务回查

后台线程每30秒间隔扫描Half_Topic,Op_Topic,找出尚未收到commit/rollback的消息,发起回查命令。

@Override 
public void run() { 
    log.info("Start transaction check service thread!"); 
    while (!this.isStopped()) { 
        //transactionCheckInterval = 30 * 1000 
        //默认每30秒检查一次 
        long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval(); 
        //RocketMQ常用的做法,sleep时间到以后,触发onWaitEnd函数 
        this.waitForRunning(checkInterval); 
    } 
    log.info("End transaction check service thread!"); 
} 
@Override 
protected void onWaitEnd() { 
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); 
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); 
    long begin = System.currentTimeMillis(); 
    log.info("Begin to check prepare message, begin time:{}", begin); 
    this.check(timeout, checkMax, this.transactionalMessageCheckListener); 
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); 
}

找出需要检查的Half_Message

private void checkMessageQueue(MessageQueue messageQueue, long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) throws InterruptedException { 
    CheckContext context = new CheckContext(messageQueue, transactionTimeout, transactionCheckMax, listener); 
    context.setOpQueue(getOpQueue(messageQueue)); 
    context.setHalfOffset(transactionalMessageBridge.fetchConsumeOffset(messageQueue)); 
    context.setOpOffset(transactionalMessageBridge.fetchConsumeOffset(context.getOpQueue())); 
    log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, context.getHalfOffset(), context.getOpOffset()); 
    if (context.getHalfOffset() < 0 || context.getOpOffset() < 0) { 
        log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, context.getHalfOffset(), context.getOpOffset()); 
        return; 
    } 
    //从OP_TOPIC查出已commit/rollback的消息 
    PullResult removeResult = fillOpRemoveMap(context.getRemoveMap(), context.getOpQueue(), context.getOpOffset(), context.getHalfOffset(), context.getOpMsgMap(), context.getDoneOpOffset()); 
    if (null == removeResult) { 
        log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, context.getHalfOffset(), context.getOpOffset()); 
        return; 
    } 
    // single thread 
    context.setPullResult(removeResult); 
    context.initOffset(); 
    while (true) { 
        if (System.currentTimeMillis() - context.getStartTime() > MAX_PROCESS_TIME_LIMIT) { 
            log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); 
            break; 
        } 
        if (context.getRemoveMap().containsKey(context.getCounter())) { 
            removeOffset(context); 
        } else { 
            if (!checkOffset(context)) break; 
        } 
        context.setNewOffset(context.getCounter() + 1); 
        context.incCounter(); 
    } 
    updateOffset(context); 
} 
/** 
 * 
 * @param context 
 * @return false:will break 
 * @throws InterruptedException 
 */ 
private boolean checkOffset(CheckContext context) throws InterruptedException { 
    GetResult getResult = getHalfMsg(context.getMessageQueue(), context.getCounter()); 
    context.setMsgExt(getResult.getMsg()); 
    if (context.getMsgExt() == null) { 
        return handleNullHalfMsg(context, getResult); 
    } 
    if (isSlaveMode()) { 
        handleSlaveMode(context); 
        return true; 
    } 
    if (isOverMaxCheckTimes(context.getMsgExt(), context.getTransactionCheckMax()) || isExpiring(context.getMsgExt())) { 
        discard(context); 
        return true; 
    } 
    if (context.getMsgExt().getStoreTimestamp() >= context.getStartTime()) { 
        log.debug("Fresh stored. the miss offset={}, check it later, store={}", context.getCounter(), new Date(context.getMsgExt().getStoreTimestamp())); 
        return false; 
    } 
    long valueOfCurrentMinusBorn = System.currentTimeMillis() - context.getMsgExt().getBornTimestamp(); 
    String checkImmunityTimeStr = context.getMsgExt().getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); 
    if (null == checkImmunityTimeStr && 0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < context.getTransactionTimeout()) { 
        log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", context.getCounter(), context.getTransactionTimeout(), new Date(context.getMsgExt().getBornTimestamp())); 
        return false; 
    } 
    Long checkImmunityTime = checkImmunityTime(context, checkImmunityTimeStr, valueOfCurrentMinusBorn); 
    if (checkImmunityTime == null) { 
        return false; 
    } 
    if (!isNeedCheck(context, valueOfCurrentMinusBorn, checkImmunityTime)) { 
        fillMoreOpRemoveMap(context); 
        return true; 
    } 
    //需要回查的消息重新放入Half_Topic队尾 
    //因为检查进度向前移动,不会返回队头 
    //如果不放回队尾,回查后还没有收到commit/rollback,下次无法重复检查了 
    if (!putBackHalfMsgQueue(context.getMsgExt(), context.getCounter())) { 
        return true; 
    } 
    //异步发起远程事务回查 
    afterPutBackMsg(context); 
    return true; 
} 
/** 
 * Read op message, parse op message, and fill removeMap 
 * 
 * @param removeMap Half message to be remove, key:halfOffset, value: opOffset. 
 * @param opQueue Op message queue. 
 * @param pullOffsetOfOp The start offset of op message queue. 
 * @param miniOffset The current minimum offset of half message queue. 
 * @param opMsgMap Map<queueOffset, HashSet<offsetValue>> Half message offset in op message 
 * @param doneOpOffset Stored op messages that have been processed. 
 * @return Op message result. 
 */ 
private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap, MessageQueue opQueue, 
    long pullOffsetOfOp, long miniOffset, Map<Long, HashSet<Long>> opMsgMap, List<Long> doneOpOffset) { 
    PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, OP_MSG_PULL_NUMS); 
    if (!handleIllegalOpMsg(pullResult, opQueue, pullOffsetOfOp)) { 
        return pullResult; 
    } 
    List<MessageExt> opMsg = pullResult.getMsgFoundList(); 
    if (opMsg == null) { 
        log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult); 
        return pullResult; 
    } 
    for (MessageExt opMessageExt : opMsg) { 
        if (opMessageExt.getBody() == null) { 
            log.error("op message body is null. queueId={}, offset={}", opMessageExt.getQueueId(), opMessageExt.getQueueOffset()); 
            doneOpOffset.add(opMessageExt.getQueueOffset()); 
            continue; 
        } 
        HashSet<Long> set = handleMsgWithRemoveTag(opMessageExt, miniOffset, removeMap); 
        if (set.size() > 0) { 
            opMsgMap.put(opMessageExt.getQueueOffset(), set); 
        } else { 
            doneOpOffset.add(opMessageExt.getQueueOffset()); 
        } 
    } 
    log.debug("Remove map: {}, Done op list: {}, opMsg map: {}", removeMap, doneOpOffset, opMsgMap); 
    return pullResult; 
} 
//消息超过最大检查次数 
private boolean isOverMaxCheckTimes(MessageExt msgExt, int transactionCheckMax) { 
    String checkTimes = msgExt.getProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES); 
    int checkTime = 1; 
    if (null != checkTimes) { 
        checkTime = StringUtils.getInt(checkTimes, -1); 
        if (checkTime >= transactionCheckMax) { 
            return true; 
        } else { 
            checkTime++; 
        } 
    } 
    msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTime)); 
    return false; 
} 
//消息文件存储将过期 
private boolean isExpiring(MessageExt msgExt) { 
    long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); 
    if (valueOfCurrentMinusBorn 
        > transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getFileReservedTime() 
        * 3600L * 1000) { 
        log.info("Half message exceed file reserved time ,so skip it.messageId {},bornTime {}", 
            msgExt.getMsgId(), msgExt.getBornTimestamp()); 
        return true; 
    } 
    return false; 
}

发起事务回查

private void afterPutBackMsg(CheckContext context) { 
    context.incPutInQueueCount(); 
    log.info("Check transaction. real_topic={},uniqKey={},offset={},commitLogOffset={}", 
        context.getMsgExt().getUserProperty(MessageConst.PROPERTY_REAL_TOPIC), 
        context.getMsgExt().getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), 
        context.getMsgExt().getQueueOffset(), context.getMsgExt().getCommitLogOffset()); 
    context.getListener().resolveHalfMsg(context.getMsgExt()); 
} 
public void resolveHalfMsg(final MessageExt msgExt) { 
    if (executorService == null) { 
        LOGGER.error("TransactionalMessageCheckListener not init"); 
        return; 
    } 
    executorService.execute(new Runnable() { 
        @Override 
        public void run() { 
            try { 
                //异步发送事务回查命令 
                sendCheckMessage(msgExt); 
            } catch (Exception e) { 
                LOGGER.error("Send check message error!", e); 
            } 
        } 
    }); 
}

丢弃Half_Message

消息放入RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC,业务介入处理

@Override 
public void resolveDiscardMsg(MessageExt msgExt) { 
    log.error("MsgExt:{} has been checked too many times, so discard it by moving it to system topic TRANS_CHECK_MAXTIME_TOPIC", msgExt); 
    try { 
        MessageExtBrokerInner brokerInner = toMessageExtBrokerInner(msgExt); 
        PutMessageResult putMessageResult = this.getBrokerController().getMessageStore().putMessage(brokerInner); 
        if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { 
            log.info("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC OK. Restored in queueOffset={}, " + 
                "commitLogOffset={}, real topic={}", msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); 
        } else { 
            log.error("Put checked-too-many-time half message to TRANS_CHECK_MAXTIME_TOPIC failed, real topic={}, msgId={}", msgExt.getTopic(), msgExt.getMsgId()); 
        } 
    } catch (Exception e) { 
        log.warn("Put checked-too-many-time message to TRANS_CHECK_MAXTIME_TOPIC error. {}", e); 
    } 
}

总结

RocketMQ的事务机制实现有以下特点:

最终一致性保障高性能

RocketMQ通过PrepareCommit消息的形式实现分布式事务,将实时一致性要求较低的操作异步化处理,提高系统的响应速度和吞吐量,又通过异步回查方式来兜底确认数据状态。虽然数据可能在一段时间内处于不一致状态,但最终会达到一致。

尽管最终一致性可以提高系统性能,但它也需要权衡系统的一致性要求。在某些场景下,即时一致性是必要的,这时需要根据具体的业务需求和应用场景来选择是否采用最终一致性策略,以及如何权衡一致性和性能之间的取舍。

最大化复用MQ能力

RocketMQ复用Topic的存储能力,把Half_Message暂存到系统内部Topic,等待客户端commit/rollback确认消息以后,再把消息发送到业务真实的Topic,在此之前消息对消费者不可见,达到状态最终一致性。

除了被动等待客户端commit/rollback确认消息,RocketMQ还通过Broker后台线程,扫描出状态待确认的消息,主动向生产者客户端发出事务回查命令,起到兜底的作用。

对于反复事务回查后,仍然无法确认状态的消息,存入CHECK_MAX_TIME_TOPIC,确保消息不丢失,由业务人工介入特殊处理。

7