Kafka Stream+WebSocket+Redis实现实时数据大屏

在我们的日常电商大促中,商家需要看到自己所卖的商品的成交量情况、当前的爆品信息、目前的成交额等实时的信息,如下图所示实时大屏

实时数据大屏功能的实现有很多的方案,如Hadoop、Flink、Apache storm都可以实现实时数据统计的功能,本文是基于Kafka Stream实现实时数据统计的功能,因为Kafka Stream相较于其他所有流处理框架来说,它是轻量级库,由于其原生即为轻量级,所以适用于一些微服务类型的架构中。kafka Stream的部署与使用非常简单,不需要额外建立集群来运行。

1、大屏实现中的使用技术点

1.1 WebSocket

传统HTTP每次都需要浏览器与服务端通过4次握手建立连接来进行通信,并且服务器端不可以向浏览器端主动的推送数据。

WebSocket 是一种长连接的通讯模式,浏览器与服务连接建立之后,后续数据都以帧序列的形式传输。在浏览器断开WebSocket连接或 Server 端断掉连接前,不需要浏览器和服务端重新发起连接请求,下图是一张简单的websocket通信图

浏览器与服务器交互负载流量很大的情况下,极大的节省了网络带宽资源的消耗且有明显的性能优势,且浏览器发送和接受消息是在同一个持久连接上发起,实时性优势明显。

1.2 Kafka Stream

Kafka Stream提供的是一个基于Kafka的流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,下图是kafka Stream的工作流程图:

生产者将数据发送到first topic中,随后kafka Stream从first topic中将数据最清洗整理工作,整理好之后kafka将数据发送second topic中,消费者可以从second topic中消费数据。

kafka stream中有KTable和KStream,我们做数据处理的时候需要用到它们,如下展示了它们的区别:

在t1时间段内,KStream记录是生产者生产的所有数据,可以认为所有记录都通过新增的方式插入进这个数据流里。KTable方式是对数据进行了整理,类似对数据做了无则新增有则增加的操作。

2、实现实时数据大屏

以上就实现实时数据大屏的主要流程图,从图中可以看到交易下单的数据发送到first topic中,然后由Kafka stream做数据处理,然后将处理好的数据发送到second topic中;服务器监听并消费second topic中的数据;此时服务器将数据一份推送到浏览器做展示,一份缓存到redis中并由定时任务(如xxl-job)将数据同步到Mysql中(因为在并发量很大的情况下直接将数据存放到Mysql中会存在性能问题)。

2.1 搭建WebSocket

#pom文件添加依赖 
 <dependency> 
            <groupId>org.springframework.boot</groupId> 
            <artifactId>spring-boot-starter-web</artifactId> 
        </dependency> 
        <!--springboot已经集成websocket--> 
        <dependency> 
            <groupId>org.springframework.boot</groupId> 
            <artifactId>spring-boot-starter-websocket</artifactId> 
        </dependency> 
        <dependency> 
            <groupId>com.alibaba</groupId> 
            <artifactId>fastjson</artifactId> 
            <version>1.2.56</version> 
        </dependency> 
        <!--kafka--> 
        <dependency> 
            <groupId>org.springframework.kafka</groupId> 
            <artifactId>spring-kafka</artifactId> 
            <version>2.6.6</version> 
            <exclusions> 
                <exclusion> 
                    <groupId>org.apache.kafka</groupId> 
                    <artifactId>kafka-clients</artifactId> 
                </exclusion> 
            </exclusions> 
        </dependency> 
        <!--kafka流式计算--> 
        <dependency> 
            <groupId>org.apache.kafka</groupId> 
            <artifactId>kafka-streams</artifactId> 
            <version>2.5.1</version> 
            <exclusions> 
                <exclusion> 
                    <artifactId>connect-json</artifactId> 
                    <groupId>org.apache.kafka</groupId> 
                </exclusion> 
                <exclusion> 
                    <groupId>org.apache.kafka</groupId> 
                    <artifactId>kafka-clients</artifactId> 
                </exclusion> 
            </exclusions> 
        </dependency> 
        <dependency> 
            <groupId>org.apache.kafka</groupId> 
            <artifactId>kafka-clients</artifactId> 
            <version>2.5.1</version> 
        </dependency>
#配置websocket 
@Configuration 
public class WebSocketConfiguration { 
    @Bean 
    public ServerEndpointExporter serverEndpointExporter() { 
        return new ServerEndpointExporter(); 
    } 
#websocket 
@Component 
@ServerEndpoint(value = "/channel/screen") 
public class ScreenWebsocketService { 
    private static final Logger LOGGER = LoggerFactory.getLogger(ScreenWebsocketService.class); 
    public static final ConcurrentMap<String, Session> SESSIONS = new ConcurrentHashMap<>(); 
    private Session session; 
    @OnOpen 
    public void onOpen(Session session,  EndpointConfig endpointConfig) 
{ 
        this.session = session; 
        SESSIONS.put(this.session.getId(), this.session); 
        LOGGER.info("新的连接:{}", this.session.getId()); 
    } 
    @OnMessage 
    public void onMessage(String message) 
{ 
        try { 

        } catch (Exception e) { 
            LOGGER.info("onMessage发生了异常"); 
        } 
    } 
    @OnClose 
    public void onClose(CloseReason closeReason) 
{ 
        LOGGER.info("连接断开:id={},reason={}",this.session.getId(),closeReason); 
        SESSIONS.remove(this.session.getId()); 
    } 
    @OnError 
    public void onError(Throwable throwable) throws IOException 
 { 
        LOGGER.info("连接异常:id={},throwable={}", this.session.getId(), throwable); 
        this.session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage())); 
    } 
    /** 
     * 推送消息 
     * @param message 
     */ 
    public static void push(String message) { 
        SESSIONS.values().stream().forEach(session -> { 
            if (session.isOpen()) { 
                try { 
                    session.getBasicRemote().sendText(message); 
                } catch (IOException e) { 
                } 
            } 
        }); 
    }

2.2 搭建webscoket和浏览器的之间的连接的html

在src/main/resorce下新建public目录,写如下的index.html

#html页面 
<!DOCTYPE html> 
<html> 
<head> 
    <meta charset="UTF-8"> 
    <title>实时数据看板</title> 
    <style> 
#logContainer { 
            height: 500px; 
            overflow-y: auto; 
            border: 1px solid #ccc; 
            padding: 10px; 
        } 
        .logEntry { 
            white-space: pre; 
            margin-bottom: 10px; 
        } 
    </style> 
</head> 
<body> 
<div id="logContainer"></div> 
<script> 
    const MAX_LOGS = 500; // 定义最大数量 
    // 创建WebSocket连接 
    const socket = new WebSocket('ws://localhost:8080/channel/screen'); 
    // 数据数组 
    const logs = []; 
    // 监听WebSocket连接事件 
    socket.onopen = function() { 
        console.log('WebSocket连接已打开'); 
    }; 
    // 监听WebSocket消息事件 
    socket.onmessage = function(event) { 
        const logContainer = document.getElementById('logContainer'); 
        // 将收到的消息添加到数组中 
        logs.push(event.data); 

        if (logs.length > MAX_LOGS) { 
            logs.splice(0, logs.length - MAX_LOGS);  
        } 
        // 更新容器的内容 
        logContainer.innerHTML = logs.map(log => '<div class="logEntry">' + log + '</div>').join(''); 
        // 滚动到最底部 
        logContainer.scrollTop = logContainer.scrollHeight; 
    }; 
    // 监听WebSocket关闭事件 
    socket.onclose = function() { 
        console.log('WebSocket连接已关闭'); 
    }; 
</script> 
</body> 
</html>

2.3 添加配置文件

server: 
  port: 8080 
spring: 
  application: 
    name: webscoket-service 
  kafka: 
    bootstrap-servers: 192.168.202.7:9092 
#生产者的配置,大部分我们可以使用默认的,这里列出几个比较重要的属性 
    producer: 
#0---表示不进行消息接收是否成功的确认 
      #1---表示当Leader接收成功时确认 
#all -1---表示Leader和Follower都接收成功时确认 
      acks: all 
#设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。 
      retries: 10 
#每批次发送消息的数量 
      batch-size: 16384 
#producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。 
      buffer-memory: 33554432 
#key序列化方式 
      key-serializer: org.apache.kafka.common.serialization.StringSerializer 
      value-serializer: org.apache.kafka.common.serialization.StringSerializer 
      properties: 
#延迟提交时间 
        linger.ms: 2000 
#消费者的配置 
    consumer: 
#消费者组 
      #是否开启自动提交 
      enable-auto-commit: false 
#自动提交的时间间隔 
      auto-commit-interval: 100ms 
#key的解码方式 
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 
      properties: 
        session: 
          timeout: 
            ms: 15000 
#一次拉取 
      max-poll-records: 500 
      group-id: testGroup 
    listener: 
      type: batch 
#不存在的时候 启动报错 
      missing-topics-fatal: false

项目启动访问http://localhost:8080/index.html后,websocket就和服务器建立了一个数据连接通道,服务器的数据就可以推送到html页面上。

2.4 kafka stream搭建

#first topic接受数据 
@Component 
public class KafkaDataHandlerConsumer { 
    @Value("${spring.kafka.bootstrap-servers}") 
    private String bootstrapServer; 
    @Resource 
    private KafkaStreamScreenService kafkaStreamScreenService; 
    @KafkaListener(topics = {"first topic"}) 
    public void handlerMessage(List<String> records) throws Exception { 
        System.out.println("接受消息-------------------"); 
        //record: ["shangpin,3"] 
        System.out.println("record: " + JSON.toJSONString(records)); 
        kafkaStreamScreenService.kafkaDataHandler(bootstrapServer); 
    } 
}
@Component 
public class KafkaStreamScreenService { 
    public void kafkaDataHandler(String bootstrapServer) throws InterruptedException { 
        //stream的构建器 
        StreamsBuilder streamsBuilder = new StreamsBuilder(); 
        //配置信息 
        Properties properties = new Properties(); 
        System.out.println("bootstrapServer: " + bootstrapServer); 
        //kafka的连接地址 
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); 
        //kafka的key序列化器 
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
        //kafka的value序列化器 
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "screen-stream-longxia04"); 
        //流式计算的方案 
        this.kafkaStream(streamsBuilder); 
        //创建kafkaStreams对象 
        new KafkaStreams(streamsBuilder.build(), properties).start(); 
        System.out.println("stream start----------------------- "); 
    } 
    /** 
     * 对数据进行清洗整理 
     * 
     */ 
    private void kafkaStream(StreamsBuilder streamsBuilder) throws InterruptedException { 
        //设置消费的主题 
        KStream<String, String> stream = streamsBuilder.stream("first toipc"); 
        //数据处理 
        stream.groupBy((key,value)-> { 
            return value; 
        }) 
        //设置数据处理时间窗口大小 
        .windowedBy(TimeWindows.of(Duration.ofSeconds(3))) 
        //对数据总统计 
        .count() 
        .toStream() 
        .map((key,value)-> { 
            return new KeyValue<>(key.key().toString(),key.key() + "出现的次数------>" + value.toString()); 
         }) 
         .to("second topic");   //数据发送到second topic 
    }
#second topic消费数据 
@Component 
public class KafkaStreamDataConsumer { 
    @KafkaListener(topics = {"second topic"}) 
    public void kafkaDataConsumerMessage(List<String> records) throws Exception { 
        System.out.println("kafkaDataConsumerMessage : " + JSON.toJSONString(records)); 
        //发送给浏览器 
        ScreenWebsocketService.push(new String(JSON.toJSONBytes(records), StandardCharsets.UTF_8)); 
        //数据缓存redis 
        //redisTemplate.opsForValue().set("key", value); 
    } 
}

页面模拟生产数据

@RestController 
@RequestMapping("/kafka") 
public class KafkaProductController { 
    @Resource 
    private KafkaTemplate<String, Object> kafkaTemplate; 
    @GetMapping("send/{message}") 
    public String sendMessage(@PathVariable("message")String message){ 
        if(StringUtils.isEmpty(message)){ 
            return "请输入消息"; 
        } 
        kafkaTemplate.send("first tpoic", message); 
        return "发送消息成功"; 
    } 
}

页面发送消息的效果展示:

在kafka Stream中设置看了时间窗口(windowedBy(TimeWindows.of(Duration.ofSeconds( 3 )))),在这个时间窗口中会做数据整理,整理好的数据后通过websocket推送到页面上。实时数据统计的结果如下:

总结:

本文通过Kafka Stream、websocket、redis、定时器实现了实时数据看板的功能(前段页面实现比较粗糙,需要专业前段做处理),其中redis和定时器主要是将数据做持久化的才做(由于实现比较简单,本文中暂时没有做展示)。

7