如何保证MQ消息的幂等性

现在微服务开发中为了满足限流消峰、减少系统之间的耦合等实际业务的需要,于是系统中往往会引入了MQ,加入了MQ之后如何保证消费者的消费幂等性便是需要解决的问题了。

1、幂等性问题

幂等性是数学上的一个概念,在我们开发中可以理解成就是多次执行某个方法并且得到结果是一样的,那么我们就认为这个过程就是幂等性。如下就是一些常见的幂等性的案例:

1、查询幂等性 
select * from user where id = 10; 
、更新的幂等性 
update user set name = 'zhangsan' where id = 20; 
、添加的幂等性(假设userId是唯一键) 
insert info user(user_id, name,age) values(1, 'zhangsan',20); 
、 删除幂等性 
delete from user where id = 21;

查询是具备天然的幂等性(不考虑数据更新/删除的情况下,多次查询始终是一个结果)。

如上所示的更新/删除都是具备幂等性的,无论这个更新执行多少次,最终的结果都是一样的。

如上所示的添加也是具备幂等性的,因为userId字段是唯一键,重新添加数据库会提示异常的。

非幂等性的就是指多次执行的结果会不一样,如下所示是常见的一些非幂等性的案例:

1、更新的非幂等性 
update user set age = age + 1 where id = 20; 
、添加的非幂等性 
insert into user(name, age, nick_name) values ('zhangsan', 20, 'sanzhang')

如上所示的案例是非幂等性的,因为执行多次的结果是不一样的。所以针对这些非幂等性操作需要做单独的处理,保证一次请求只会执行一次。

2、MQ出现幂等性问题的原因

(1)生产者重复生产

由于网络原因,第一次生产者的发送的消息MQ已经接收到了,但是给服务响应的时候超时了,导致服务器再次投递了相同的消息,在消息队列中存在了两条相同的消息。下游的消费者就需要消费两次个相同的消息,消费者需要自己做幂等性的处理。

(2)MQ重试机制

消费者第一次消费id=1的消息时候,此时消费此时消费成功但是响应MQ的时候超时导致MQ认为当前的消息消费者没有成功消费,过一段时间之后重新投递给消费者消费,那么就导致同样的消息消费者消费了两次,此时消费者需要自己做幂等性的处理。

3、MQ的幂等性解决方案

分析了MQ出现幂等性问题的原因之后我们需要对消费者端做一些幂等性措施来保证实际的业务安全性。常见的方案如下所示:

(1)查询法

方案的原理:根据消息中的业务id(如订单的id)查询数据库中当前的业务是否执行过,如果业务执行过就不再处理当前消息。

方案存在缺陷,在高并发下会出现无法保证消息的幂等性问题,如下图所示的场景:

线程A和线程 B 同时到达查询位置,此时查询的时候没有数据,此时线程A继续持有时间片,线程B时间片用完,那么线程A执行业务并操作数据库,同时线程B又获得时间片开始执行业务处理。最终的结果就是线程A和线程B执行了两次数据库操作。所以高并发下查询法没有办法完成消息幂等性问题。

(2)加悲观锁方案

查询法中在高并发下仍然存在消息幂等性的问题,那么针对查询法的缺陷可以使用在底层加悲观锁的方案来解决,即就是查询的时候添加for update,这样可以保证在一个线程事务中加锁成功后,其他的线程就必须等待释放锁才能操作,如下图所示:

悲观锁可以很好的解决消息幂等性的问题,但是悲观锁的并发度低是一个不容忽视的性能问题。由于业务在事务中加锁了,如果业务相对复杂的情况下,消息的消费也会变长,那么在高并发下消息的消费速度会比较慢。

(3)乐观锁机制方案

通过上述的分析我们知道悲观锁的执行效率是比较低的,我们可以采用了乐观锁的机制替代悲观锁,如下所示:

在消息生产的时候携带消息的id、消息的版本号,然后在消费端根据乐观锁原理来执行,即就是根据id和version做更新操作,通过判断影响行是否大于0来判断执行是否成功;如果影响行数大于0表示当前的消息执行完成,反之就执行失败,执行失败不做抛异常。

乐观锁机制虽然可以提高系统的并发度但是它对业务有侵入,生产者不仅要携带参数id,现在还需要携带version传到下游中,这样给开发带来了一些不便。

(4)去重表方案

消费者先将消息中的唯一键(消息的id或者业务中的唯一键如订单id)获取到,然后数据插入到表message中(message表中设置msg_id为唯一键),如果数据添加成功就放行继续执行相关的业务逻辑;如果添加失败我们需要catch到异常,如果是DuplicateKeyException就直接吞掉异常并提示消息消费成功。核心的代码如下:

try { 
、添加数据到message表 
  messageMapper.addMessage(message); 
、执行业务逻辑 
  this.dealMessage(message); 
、执行业务成功后 返回消息消费成功的标识 
  return Boolean.TRUE; 
}catch(DuplicateKeyException de){ 
  log.warn("消息重复 messageId:{}", message.getId()) 
  return Boolean.TRUE; 
}

基于去重表方案由于依赖的只是消息表而与具体业务本身无关,所以此方案可以扩展到不同的应用场景中。

去重表方案也存在一定的局限性,如消息的消费逻辑必须是依赖于关系型数据库事务,如果消费过程中还涉及不支持事务的数据源数据的修改(如ES、Redis),那么执行过程有异常无法回滚数据。还要求数据库的数据必须是在一个库中,不支持跨库操作。

(5)非事务的去重表方案

如果现在业务需要将数据使用RPC的方式同步通知其他的系统,那么现在调整消费者消费方案,如下所示:

(1)消费者拉去MQ队列中的消息开始执行消费的逻辑,首先将消息信息携带过期时间的方式添加到数据表中,如下添加数据的sql:

insert message (msg_id, desc, exprire_time, status) values  
(1,'MQ消息', 5, 0)

过期时间expire_time设置成5表示5分钟有效。

(2)根据数据库插入消息是否成功相应的逻辑处理

(a)MQ的消息添加数据库成功;首先完成消息的本地业务,然后使用RPC通知其他的系统,如果通知其他系统成功,此时将消息表数据状态修改成消费成功(status:0 -> 1),最后通知MQ本消息消费成功;如果通知其他系统失败就回滚业务、删除消息表的记录,通知MQ消息消费失败。等待下一次的推送继续业务处理。

(b)MQ消息添加数据库失败;查询已经添加到数据库中的MQ消息的的数据(如数据库中的数据:msgId=1, desc=’MQ消息’, expireTime=5 , status=0),根据数据status的值做判断:如果status=0(表示消息正在消费中)此时就要返回给MQ消息消费失败,等待下一次继续推送; 如果status=1(表示消息消费成功),此时给MQ返回消息消费成功。

(3)开启定时任务(如XXL-Job)每隔一段时间(如3分钟执行一次),拉取数据库中过期的消息数据,然后做删除操作。这里设置过期时间的主要目的是防止出现死信的问题,如下所示:

第一条消息在处理中,由于某种原因一致处理很慢导致最终是失败了,第二条消息(其实是重复的消息)会走一遍先添加的操作,此时添加是失败的,那么会不断的走重试逻辑,重试一定次数之后就会加入到死信的队列中。添加了定时任务就是清理这些过期的数据保证下一次消息就可以执行正常的消费逻辑。

至此我们就完成利用消息的重试机制完成幂等+分布式事务的处理。

总结:

(1)查询法去重解决方案是先判断再操作,但是会有并发重复消费的问题,针对并发去重问题可以借助select for update悲观锁或者乐观锁来解决。

(2)去重表方案可以很好的处理消息幂等问题,但是无法支持跨库操作以及不支持事务的数据源数据数据的修改,为此引入了非事务消息幂等性方案。

3