整理常见的实时消息推送方案

消息推送在我们的日常生活中很常见,如大促销售额实时大屏数据展示、网站未读消息提示、游戏中技能释放、B站上浏览视频评论时如果出现最新评论会立马弹出来,这些都是实时消息推送的案例,只是每个公司的实现方式不同。下面整理一下市面上常见的一些实时消息推送的实现方案。

1、轮询——短轮询

实现思路:前端使用定时任务(setInterval)来每隔一段时间发送一个请求到后端获取数据,如果后端有数据就返回数据,无数据返回空。

核心代码:

短轮询的问题:n次请求中可能只有几次甚至是0次是有效的请求(可以获取到数据的请求),这样就造成对服务器资源的浪费;如果请求非常大的时候,会对服务器造成很大压力。

2、轮询——长轮询

实现思路:当前端发起一个请求给后端的时候,后端不会立马的响应而是将当前的请求线程进行挂起,只有当出现最新的消息才会响应给前端。

长轮询的问题:这种方式可以有效的减少请求的数量,但是无数据的时候前端请求的线程会被一直挂载起来。那么就会存在如下的问题:

(1)如果后端一直没有最新的数据,那么此时这个请求就会因为等待时间过长而导致请求超时的问题。

(2)高并发下,由于请求量很大就容易造成大量的线程堆积问题,这样给服务器造成了很大的压力。

综上分析:虽然长/短轮询实现上简单,但是只能做一些请求量低、简单的业务场景。

3、SSE推送数据方案

实现思路:前端先发起一个请求到后端,后端会在响应中进行SSE长链接的协议设置 r esponse .setContentType( "text/event-stream");当客户端收到长链接协议之后长链接就建立成功,当服务端发生了数据的改变就可以实时的把数据推送到客户端。

SSE订阅服务器事件时候,常见的Header设置

Accept: text/event-stream 表示可接收事件流类型 
Cache-Control: no-cache 禁用任何的事件缓存 
Connection: keep-alive 表示正在使用持久连接

SSE是基于Http协议的单向通信,适用于数据更新频繁、低延迟的服务器向客户端实时推送数据的场景,常见的使用案例:在线聊天、实时监控、新闻实时推送等。

核心代码:

/** 
  * 创建连接sse 
  * 
  */ 
 public SseEmitter connect() { 
     final String clientId = UUID.randomUUID().toString().replace("-", ""); 
     SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L); 
       try { 
           sseEmitter.send(SseEmitter.event().comment("创建连接成功 !!!")); 
       } catch (Exception e) { 
           logger.error("创建连接失败 , {} ", e.getMessage()); 
       } 

      sseEmitter.onCompletion(() -> { 
          logger.info("connect onCompletion , {} 结束连接 ...", clientId); 
          removeClient(clientId); 
      }); 

      sseEmitter.onTimeout(() -> { 
          logger.info("connect onTimeout , {} 连接超时 ...", clientId); 
          removeClient(clientId); 
      }); 

      sseEmitter.onError((throwable) -> { 
          logger.error("connect onError , {} 连接异常 ...", clientId); 
          removeClient(clientId); 
      }); 

      sseCache.put(clientId, sseEmitter); 
    return sseEmitter; 
 }

4、Websokcet方案

实现原理:客户端发起一个长链接的请求

#请求建立长链接 
new WebSocket("ws://192.168.27.201:8080/ws/connect“)

服务端同意建立长链接之后,客户端和服务端之间会进行一个双向通道的长连接。

核心代码实现:

#建立连接、接受数据、关闭连接 
@ServerEndpoint("/ws/connect")      //类似restful中的@RequestMapping 
@Component 
public class WebsocketServer { 
    public static ConcurrentHashMap<String,Session> clintsMap = new ConcurrentHashMap<>(); 
    @OnOpen 
    public void onOpen(Session session, @PathParam("token")String token){ 
        System.out.println("客户端开始建立连接:"+token); 
        clintsMap.put(token, session); 
    } 
    @OnMessage 
    public void onMessage(String msg,  @PathParam("token")String token){ 
        System.out.println("客户端:"+token + "发送信息给服务器:" + msg); 
    } 
    @OnClose 
    public void onClose(@PathParam("token")String token){ 
        System.out.println("客户端关闭连接:"+token); 
        clintsMap.remove(token); 
    } 
    @OnError 
    public void onError(Throwable e){ 
        System.out.println("客户端连接发生异常"); 
    } 
} 
#配置 
@Configuration 
public class WebSocketConfiguration { 
    @Bean 
    public ServerEndpointExporter serverEndpointExporter() { 
        return new ServerEndpointExporter(); 
    } 
}

5、netty实时推送方式

基于netty框架实施通信业务以及并发量高的复杂场景,因为Netty已经集成了NIO模型以及对Websocket进行了封装,使得使用Websocket更加的方便,性能也是更加的高。

6、MQTT实现实时消息推送

MQTT(Message Queuing Telemetry Transport)是一种轻量级、基于发布-订阅模式的消息传输协议,适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。

MQTT可以采用MQ来进行实现双向的通道通信,它通过订阅发布模式使得所有的设备通过MQTT代理进行订阅和发布信息,当任意一个设备往MQTT代理当中去发布一条消息,其它的所有设备都可以实时感知并获得数据。

MQTT为了实现不同网络环境下保证消息的可靠性,提供了三个保证消息传递可靠性的级别:

级别 含义
QoS 0 消息最多传送一次。如果当前客户端不可用,它将丢失这条消息
QoS 1 消息至少传送一次
Qo S 2 消息只传送一次

总结:以上就是常见的几种实现消息实时推送的方案,针对业务简单、请求少的场景我们可以使用轮询的方式实现;业务稍微复杂一些,请求量大的场景我们可以使用Websockt/netty实现,针对物联网中我们采用MQTT方案。我们需要根据实际业务特点采用合适的方案开发。

8