SpringBoot整合RabbitMQ实现邮件异步发送(修正版)
在之前的文章中,我们详细介绍了 SpringBoot 整合 mail 实现各类邮件的自动推送服务。
但是这类服务通常不稳定,当出现网络抖动的时候,会导致邮件自动推送失败。
本篇文章将介绍另一种高可用的服务架构,以便实现邮件 100% 被投递成功。类似的短信推送等服务,实现逻辑也大体类似。
01、先来一张流程图
本文内容主要围绕这个流程图展开,利用 RabbitMQ 消息队列来实现邮件 100% 被投递,内容涵盖了 RabbitMQ 很多知识点,如:
- 生产者和消费者模型
- 消息发送机制
- 消费确认机制
- 消息的重新投递
- 消息消费失败的处理方案
02、实现思路
- 1.准备一台电脑,并安装 RabbitMQ 服务
- 2.开放 QQ 邮箱或者其它邮箱授权码,用于发送邮件
- 3.创建邮件发送项目并编写代码
- 4.发送邮件测试
- 5.消息消费失败的处理介绍
03、环境准备
3.1、安装 RabbitMQ 服务
安装 RabbitMQ 服务,这一步比较简单,可以访问下面的官方地址,下载软件包并依次按照步骤进行安装即可。
https://rabbitmq.org.cn/docs/download
安装成功之后,登陆 RabbitMQ 控制台,可以看到类似于如下界面。
#### 3.1.1、创建交换器
点击“Exchanges”菜单,进入“交换器”管理界面。
进入之后,点击最下方“Add a new exchange”按钮,创建一个类型为topic
,名称叫mail.exchange
的交换器,并提交。
#### 3.1.2、创建消息队列
接着,点击“Queues”菜单,进入消息队列管理界面。
同样的,点击最下方“Add a new queue”按钮,创建一个名称叫mq.mail.ack
的消息队列,并提交。
保存之后,在列表中可以看到刚刚创建的消息队列,然后点击进入详情。
在详情中,将当前消息队列与上文创建的交换器进行绑定,便于后续通过交换器来发送消息到队列,操作如下。
对于topic
类型的交换器,通常不直接与消息队列进行交互,而是通过一个路由键,将消息路由到目标消息队列,这样设计的目的是让消息投递更加灵活。路由键,可以简单理解为类似于路由器,对数据进行路由分发处理。
3.2、配置邮箱发送服务器
为了实现邮件自动发送功能,我们还需要准备一个邮箱发送服务器,这一步在之前的文章中已经详细的介绍过,在此,我们简单的再介绍一下。
以 QQ 邮箱为例,登陆进去之后,在设置里面开启 POP3/SMTP 服务,并获取授权码记录下来。
该授权码,就是下文配置文件中spring.mail.password
需要的密码!
04、方案实践
4.1、构建项目
在 IDEA 下创建一个名称为smail
的 Spring Boot 项目,pom
文件中加入amqp
和mail
相关依赖包,示例如下:
<!--mail 支持-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<!--amqp 支持-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2、添加相关配置
在application.properties
中添加 rabbitmq、邮箱相关配置,示例如下:
# 配置邮件发送主机地址
spring.mail.host=smtp.exmail.qq.com
# 配置邮件发送服务端口号
spring.mail.port=465
# 配置邮件发送服务协议
spring.mail.protocol=smtp
# 配置邮件发送者用户名或者账户
spring.mail.username=xxxx
# 配置邮件发送者密码或者授权码
spring.mail.password=xxxx
# 配置邮件默认编码
spring.mail.default-encoding=UTF-8
# 配置smtp相关属性
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.ssl.enable=true
spring.mail.properties.mail.smtp.ssl.required=true
#rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=test
spring.rabbitmq.password=test
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
其中,spring.mail.username
和spring.mail.password
指的就是上文中创建的邮箱账号和授权码,将其配置进去即可。
4.3、编写 RabbitMQ 配置类
编写一个 RabbitMQ 配置类,用于监听消息的发送情况,示例如下。
@Configuration
public class RabbitConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置消息转换器为json格式
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 消息是否成功发送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
LOGGER.info("消息发送到Exchange成功,{}", correlationData);
} else {
LOGGER.error("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
}
});
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);
// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
LOGGER.error("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
return rabbitTemplate;
}
}
4.4、编写生产者服务
在 Spring Boot 中,我们可以利用RabbitTemplate
工具,将数据通过交换器发送到目标消息队列,示例如下。
@Service
public class ProduceService {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
* @param mail
* @return
*/
public boolean sendByAck(Mail mail) {
// 创建uuid
String msgId = UUID.randomUUID().toString().replaceAll("-", "");
mail.setMsgId(msgId);
// 发送消息到mq服务器中(附带消息ID)
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend("mail.exchange", "route.mail.ack", MessageHelper.objToMsg(mail), correlationData);
return true;
}
}
4.5、编写消费者服务
在 Spring Boot 中,我们可以利用@RabbitListener
注解,监听指定的消息队列,如果队列中有消息会第一时间收到回调,示例如下。
@Component
public class ConsumerService {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerService.class);
@Autowired
private SendMailService sendMailService;
/**
* 监听消息队列,手动确认模式,必须手动调用ack或者nack方法
* 配置参数:spring.rabbitmq.listener.simple.acknowledge-mode=manual
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = {"mq.mail.ack"})
public void consumeFromAck(Message message, Channel channel) throws IOException {
LOGGER.info("收到消息:{}", message.toString());
//将消息转化为对象
Mail mail = MessageHelper.msgToObj(message, Mail.class);
// 手动确认模式
long tag = message.getMessageProperties().getDeliveryTag();
boolean success = sendMailService.send(mail);
if (success) {
// 消费成功,消息会被删除
channel.basicAck(tag, false);
} else {
// 消费失败,重新返回队列
channel.basicNack(tag, false, true);
}
}
}
4.6、编写邮件发送服务
正如之前的文章中所介绍的,在 Spring Boot 中,我们可以利用JavaMailSender
工具来实现邮件的自动推送,示例如下。
@Service
public class SendMailService {
private static final Logger LOGGER = LoggerFactory.getLogger(SendMailService.class);
@Value("${spring.mail.username}")
private String from;
@Autowired
private JavaMailSender mailSender;
/**
* 发送简单邮件
*
* @param mail
*/
public boolean send(Mail mail) {
String to = mail.getTo();// 目标邮箱
String title = mail.getTitle();// 邮件标题
String content = mail.getContent();// 邮件正文
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom(from);
message.setTo(to);
message.setSubject(title);
message.setText(content);
try {
mailSender.send(message);
LOGGER.info("邮件发送成功");
return true;
} catch (MailException e) {
LOGGER.error("邮件发送失败, to: {}, title: {}", to, title, e);
return false;
}
}
}
4.7、编写 controller 接口
接着,编写一个 controller 接口,将邮件发送服务暴露出去,示例如下:
@RestController
public class MailController {
@Autowired
private ProduceService produceService;
@PostMapping("send")
public String sendMail(Mail mail) {
boolean result = produceService.sendByAck(mail);
return result ? "success": "fail";
}
}
4.8、服务测试
最后,启动 SpringBoot 服务,用 postman 来测试一下。
查看控制台信息。
查询接受者邮件信息。
可以清楚的看到,邮件发送成功!
当大批量的发送邮件,也不用担心,因为整个邮件的发送都是异步的,不会阻塞主流程的运行。
05、消费失败的处理方案
虽然以上的方案非常可靠,可以保证发出的消息 100% 被消费,但是其实也有弊端。
试想一下,按照上面的处理逻辑,假设其中有一条消息,因为某种原因一直发送失败,会出现什么样的情况?
此时,这条消息会重新返回队列,然后一直重试,会导致其它的消息可能会无法被消费。
针对这种情况,最简单粗暴的办法就是,当重试失败之后将消息丢弃,不会阻碍其它的消息被正常处理,不过会丢失数据。
那么如何正确的处理消息消费失败的问题呢?
可以借助数据库来记录消费失败的数据,针对系统无法成功处理的消息,人工进行干预。
实践过程如下!
5.1、创建一张消息日志表
首先,在数据库中创建一张消息日志表,用于跟踪消息数据的状态,示例如下:
CREATE TABLE `msg_log` (
`msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',
`exchange` varchar(100) NOT NULL DEFAULT '' COMMENT '交换机',
`route_key` varchar(100) NOT NULL DEFAULT '' COMMENT '路由键',
`queue_name` varchar(100) NOT NULL DEFAULT '' COMMENT '队列名称',
`msg` text COMMENT '消息体, json格式化',
`result` varchar(255) DEFAULT NULL COMMENT '处理结果',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态,0:等待消费,1:消费成功,2:消费失败,9:重试失败',
`try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`msg_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='mq消息日志';
5.2、改写生产者逻辑
在生产者服务类中,先将消息数据写入数据库,再向 rabbitMQ 服务中发消息,示例如下:
@Service
public class ProduceService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MsgLogService msgLogService;
/**
* 发送消息
* @param mail
* @return
*/
public boolean sendByAuto(Mail mail) {
String msgId = UUID.randomUUID().toString().replaceAll("-", "");
mail.setMsgId(msgId);
// 1.存储要消费的数据
msgLogService.save("mail.exchange", "route.mail.auto", "mq.mail.auto", msgId, mail);
// 2.发送消息到mq服务器中(附带消息ID)
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend("mail.exchange", "route.mail.auto", MessageHelper.objToMsg(mail), correlationData);
return true;
}
}
5.3、改写消费者逻辑
在消费者服务类中,收到消息之后,不管处理成功还是失败,都只会修改数据库中的消息状态,并且消息处理失败时,不再重新返回队列。
@Component
public class ConsumerService {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerService.class);
@Autowired
private SendMailService sendMailService;
@Autowired
private MsgLogService msgLogService;
/**
* 监听消息队列,自动确认模式,无需调用ack或者nack方法,当程序执行时才删除消息
* 配置参数:spring.rabbitmq.listener.simple.acknowledge-mode=auto
* @param message
*/
@RabbitListener(queues = {"mq.mail.auto"})
public void consumeFromAuto(Message message) {
LOGGER.info("收到消息:{}", message.toString());
// 获取消息ID
Mail mail = MessageHelper.msgToObj(message, Mail.class);
// 消息幂等性处理,如果已经处理成功,无需重复消费
MsgLog queryObj = msgLogService.selectByMsgId(mail.getMsgId());
if(Objects.nonNull(queryObj) && Constant.SUCCESS.equals(queryObj.getStatus())){
return;
}
// 发送邮件
boolean success = sendMailService.send(mail);
if(success){
msgLogService.updateStatus(mail.getMsgId(), Constant.SUCCESS, "邮件发送成功");
} else {
msgLogService.updateStatus(mail.getMsgId(), Constant.FAIL, "邮件发送失败");
}
}
}
因为此处采用自动确认模式,因此还需要修改application.properties
中的配置参数,内容如下:
# 设置自动确认(默认此模式)
spring.rabbitmq.listener.simple.acknowledge-mode=auto
5.4、编写定时任务对失败消息进行补偿投递
当消息消费失败时,会自动记录到数据库。
实际上,不可能每条数据都需要我们进行干预,有的可能重试一次就好了,因此可以编写一个定时任务,将消费失败的数据筛选出来,重新放入到消息队列中,只有当消费次数达到设置的最大值,此时进入人工干预阶段,可以节省不少的工作。
示例如下:
@Component
public class ScheduledTask {
private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTask.class);
/**
* 最大投递次数
*/
private static final int MAX_TRY_COUNT = 3;
@Autowired
private MsgLogService msgLogService;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 每30s拉取消费失败的消息, 重新投递
*/
@Scheduled(cron = "0/30 * * * * ?")
public void retry() {
LOGGER.info("开始执行重新投递消费失败的消息!");
// 查询需要重新投递的消息
List<MsgLog> msgLogs = msgLogService.selectFailMsg();
for (MsgLog msgLog : msgLogs) {
if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
msgLogService.updateStatus(msgLog.getMsgId(), Constant.RETRY_FAIL, msgLog.getResult());
LOGGER.info("超过最大重试次数, msgId: {}", msgLog.getMsgId());
break;
}
// 重新投递消息
CorrelationData correlationData = new CorrelationData(msgLog.getMsgId());
rabbitTemplate.convertAndSend("", msgLog.getQueueName(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);
// 更新下次重试时间
msgLogService.updateNextTryTime(msgLog.getMsgId(), msgLog.getTryCount());
}
}
}
最后别忘了,在Application
类上添加@EnableScheduling
,以便让定时调度生效,示例如下:
@EnableScheduling
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
}
利用定时任务,对投递失败的消息进行补偿投递,基本可以保证消息 100% 消费成功!
06、小结
本文主要以实现邮件自动推送这个业务场景为例,通过 Springboot 整合 rabbitMQ 技术来实现高可用的效果。
当然,解决这个业务需求的技术方案还有很多,例如 Springboot 整合 rocketMQ 也可以实现这个效果,不管怎么变,底层的实现思路基本都一样。
希望本篇的知识总结,对大家有所帮助。
最后,代码都经过自测,想要获取项目源码的同学,可以点击如下地址获取。
示例代码地址:
https://gitee.com/pzblogs/spring-boot-example-demo