Kafka Stream+WebSocket+Redis实现实时数据大屏
在我们的日常电商大促中,商家需要看到自己所卖的商品的成交量情况、当前的爆品信息、目前的成交额等实时的信息,如下图所示实时大屏
实时数据大屏功能的实现有很多的方案,如Hadoop、Flink、Apache storm都可以实现实时数据统计的功能,本文是基于Kafka Stream实现实时数据统计的功能,因为Kafka Stream相较于其他所有流处理框架来说,它是轻量级库,由于其原生即为轻量级,所以适用于一些微服务类型的架构中。kafka Stream的部署与使用非常简单,不需要额外建立集群来运行。
1、大屏实现中的使用技术点
1.1 WebSocket
传统HTTP每次都需要浏览器与服务端通过4次握手建立连接来进行通信,并且服务器端不可以向浏览器端主动的推送数据。
WebSocket 是一种长连接的通讯模式,浏览器与服务连接建立之后,后续数据都以帧序列的形式传输。在浏览器断开WebSocket连接或 Server 端断掉连接前,不需要浏览器和服务端重新发起连接请求,下图是一张简单的websocket通信图
浏览器与服务器交互负载流量很大的情况下,极大的节省了网络带宽资源的消耗且有明显的性能优势,且浏览器发送和接受消息是在同一个持久连接上发起,实时性优势明显。
1.2 Kafka Stream
Kafka Stream提供的是一个基于Kafka的流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,下图是kafka Stream的工作流程图:
生产者将数据发送到first topic中,随后kafka Stream从first topic中将数据最清洗整理工作,整理好之后kafka将数据发送second topic中,消费者可以从second topic中消费数据。
kafka stream中有KTable和KStream,我们做数据处理的时候需要用到它们,如下展示了它们的区别:
在t1时间段内,KStream记录是生产者生产的所有数据,可以认为所有记录都通过新增的方式插入进这个数据流里。KTable方式是对数据进行了整理,类似对数据做了无则新增有则增加的操作。
2、实现实时数据大屏
以上就实现实时数据大屏的主要流程图,从图中可以看到交易下单的数据发送到first topic中,然后由Kafka stream做数据处理,然后将处理好的数据发送到second topic中;服务器监听并消费second topic中的数据;此时服务器将数据一份推送到浏览器做展示,一份缓存到redis中并由定时任务(如xxl-job)将数据同步到Mysql中(因为在并发量很大的情况下直接将数据存放到Mysql中会存在性能问题)。
2.1 搭建WebSocket
#pom文件添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--springboot已经集成websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.6</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--kafka流式计算-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.5.1</version>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.1</version>
</dependency>
#配置websocket
@Configuration
public class WebSocketConfiguration {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
#websocket
@Component
@ServerEndpoint(value = "/channel/screen")
public class ScreenWebsocketService {
private static final Logger LOGGER = LoggerFactory.getLogger(ScreenWebsocketService.class);
public static final ConcurrentMap<String, Session> SESSIONS = new ConcurrentHashMap<>();
private Session session;
@OnOpen
public void onOpen(Session session, EndpointConfig endpointConfig)
{
this.session = session;
SESSIONS.put(this.session.getId(), this.session);
LOGGER.info("新的连接:{}", this.session.getId());
}
@OnMessage
public void onMessage(String message)
{
try {
} catch (Exception e) {
LOGGER.info("onMessage发生了异常");
}
}
@OnClose
public void onClose(CloseReason closeReason)
{
LOGGER.info("连接断开:id={},reason={}",this.session.getId(),closeReason);
SESSIONS.remove(this.session.getId());
}
@OnError
public void onError(Throwable throwable) throws IOException
{
LOGGER.info("连接异常:id={},throwable={}", this.session.getId(), throwable);
this.session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, throwable.getMessage()));
}
/**
* 推送消息
* @param message
*/
public static void push(String message) {
SESSIONS.values().stream().forEach(session -> {
if (session.isOpen()) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
}
}
});
}
2.2 搭建webscoket和浏览器的之间的连接的html
在src/main/resorce下新建public目录,写如下的index.html
#html页面
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>实时数据看板</title>
<style>
#logContainer {
height: 500px;
overflow-y: auto;
border: 1px solid #ccc;
padding: 10px;
}
.logEntry {
white-space: pre;
margin-bottom: 10px;
}
</style>
</head>
<body>
<div id="logContainer"></div>
<script>
const MAX_LOGS = 500; // 定义最大数量
// 创建WebSocket连接
const socket = new WebSocket('ws://localhost:8080/channel/screen');
// 数据数组
const logs = [];
// 监听WebSocket连接事件
socket.onopen = function() {
console.log('WebSocket连接已打开');
};
// 监听WebSocket消息事件
socket.onmessage = function(event) {
const logContainer = document.getElementById('logContainer');
// 将收到的消息添加到数组中
logs.push(event.data);
if (logs.length > MAX_LOGS) {
logs.splice(0, logs.length - MAX_LOGS);
}
// 更新容器的内容
logContainer.innerHTML = logs.map(log => '<div class="logEntry">' + log + '</div>').join('');
// 滚动到最底部
logContainer.scrollTop = logContainer.scrollHeight;
};
// 监听WebSocket关闭事件
socket.onclose = function() {
console.log('WebSocket连接已关闭');
};
</script>
</body>
</html>
2.3 添加配置文件
server:
port: 8080
spring:
application:
name: webscoket-service
kafka:
bootstrap-servers: 192.168.202.7:9092
#生产者的配置,大部分我们可以使用默认的,这里列出几个比较重要的属性
producer:
#0---表示不进行消息接收是否成功的确认
#1---表示当Leader接收成功时确认
#all -1---表示Leader和Follower都接收成功时确认
acks: all
#设置大于0的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。
retries: 10
#每批次发送消息的数量
batch-size: 16384
#producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。
buffer-memory: 33554432
#key序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
#延迟提交时间
linger.ms: 2000
#消费者的配置
consumer:
#消费者组
#是否开启自动提交
enable-auto-commit: false
#自动提交的时间间隔
auto-commit-interval: 100ms
#key的解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
session:
timeout:
ms: 15000
#一次拉取
max-poll-records: 500
group-id: testGroup
listener:
type: batch
#不存在的时候 启动报错
missing-topics-fatal: false
项目启动访问http://localhost:8080/index.html后,websocket就和服务器建立了一个数据连接通道,服务器的数据就可以推送到html页面上。
2.4 kafka stream搭建
#first topic接受数据
@Component
public class KafkaDataHandlerConsumer {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServer;
@Resource
private KafkaStreamScreenService kafkaStreamScreenService;
@KafkaListener(topics = {"first topic"})
public void handlerMessage(List<String> records) throws Exception {
System.out.println("接受消息-------------------");
//record: ["shangpin,3"]
System.out.println("record: " + JSON.toJSONString(records));
kafkaStreamScreenService.kafkaDataHandler(bootstrapServer);
}
}
@Component
public class KafkaStreamScreenService {
public void kafkaDataHandler(String bootstrapServer) throws InterruptedException {
//stream的构建器
StreamsBuilder streamsBuilder = new StreamsBuilder();
//配置信息
Properties properties = new Properties();
System.out.println("bootstrapServer: " + bootstrapServer);
//kafka的连接地址
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
//kafka的key序列化器
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//kafka的value序列化器
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "screen-stream-longxia04");
//流式计算的方案
this.kafkaStream(streamsBuilder);
//创建kafkaStreams对象
new KafkaStreams(streamsBuilder.build(), properties).start();
System.out.println("stream start----------------------- ");
}
/**
* 对数据进行清洗整理
*
*/
private void kafkaStream(StreamsBuilder streamsBuilder) throws InterruptedException {
//设置消费的主题
KStream<String, String> stream = streamsBuilder.stream("first toipc");
//数据处理
stream.groupBy((key,value)-> {
return value;
})
//设置数据处理时间窗口大小
.windowedBy(TimeWindows.of(Duration.ofSeconds(3)))
//对数据总统计
.count()
.toStream()
.map((key,value)-> {
return new KeyValue<>(key.key().toString(),key.key() + "出现的次数------>" + value.toString());
})
.to("second topic"); //数据发送到second topic
}
#second topic消费数据
@Component
public class KafkaStreamDataConsumer {
@KafkaListener(topics = {"second topic"})
public void kafkaDataConsumerMessage(List<String> records) throws Exception {
System.out.println("kafkaDataConsumerMessage : " + JSON.toJSONString(records));
//发送给浏览器
ScreenWebsocketService.push(new String(JSON.toJSONBytes(records), StandardCharsets.UTF_8));
//数据缓存redis
//redisTemplate.opsForValue().set("key", value);
}
}
页面模拟生产数据
@RestController
@RequestMapping("/kafka")
public class KafkaProductController {
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("send/{message}")
public String sendMessage(@PathVariable("message")String message){
if(StringUtils.isEmpty(message)){
return "请输入消息";
}
kafkaTemplate.send("first tpoic", message);
return "发送消息成功";
}
}
页面发送消息的效果展示:
在kafka Stream中设置看了时间窗口(windowedBy(TimeWindows.of(Duration.ofSeconds( 3 )))),在这个时间窗口中会做数据整理,整理好的数据后通过websocket推送到页面上。实时数据统计的结果如下:
总结:
本文通过Kafka Stream、websocket、redis、定时器实现了实时数据看板的功能(前段页面实现比较粗糙,需要专业前段做处理),其中redis和定时器主要是将数据做持久化的才做(由于实现比较简单,本文中暂时没有做展示)。