让数据流动起来,RocketMQ Connect 技术架构解析 - 阿里技术

通过本文可以了解RocketMQ Connect的概念、原理、使用场景,以及如何动手编写一个Connector。

01 Why RocketMQ Connect

在业务系统,或者大数据系统中不同数据源之间的数据同步是十分常见的,传统的点对点的数据同步工具,在面临越来越多的数据源点对点的数据同步会产生N*N的问题,开发成本,维护成本都是非常高的,因为上下游是耦合的,一个数据源的逻辑调整,也可能会影响多个数据管道之间的数据同步。

引入消息中间件将上下游解耦这些问题是不是就迎刃而解了?通过消息中间件上下游的数据源的处理逻辑相对独立,但是如果我们直接用RocketMQ的生产者消费者来构建数据管道还需要考虑以下挑战。

1)异构数据源越来越多,如何实现任意数据源的数据同步?

2)高性能,如何高效的在源数据源到目的数据源的数据同步?

3)高可用,故障处理能力,当一个节点挂掉是否这个节点的任务就停止了,任务重新启动是否还可以断点续传。

4)弹性扩缩容,是否可以根据系统数据的变动动态的增加减少节点?

5)集群监控,运维管理,随着数据管道的增多,如何管理,运维监控这些数据管道也变的越来越复杂。

我们通过RocketMQ的生产者消费者开发一个完整的数据管道是比较简单的,可能几天就可以完成,但是如果想一起解决上述这些问题可能需要几个月才能完成,而且一旦我们完成了这样一个系统,就会发现,它与RocketMQ Connect是一个非常类似的系统,而上述这些问题是RocketMQ Connect都已经完美解决的问题。

下面将系统化的介绍一下RocketMQ Connect是如何解决这些问题的。

02 RocketMQ Connect介绍

2.1 什么是RocketMQ Connect

RocketMQ Connect是RocketMQ数据集成重要组件,可将各种系统中的数据通过高效,可靠,流的方式,流入流出到RocketMQ,它是独立于RocketMQ的一个单独的分布式,可扩展,可容错系统,它具备低延时,高靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。

RocketMQ Connect具备以下优势:

1)上下游解耦,因为上下游都是通过RocketMQ做数据中转的,所以上下游系统是异步解耦的,上下游系统变化不会互相影响,只需要关注与RocketMQ的数据同步。

2)低延迟,流式数据传输,秒级甚至毫秒级延迟。

3)可靠性高,集群部署,支持failover,数据可回放。

4)扩展性强,通过Connect API可以实现与任意系统之间建立连接。

5)实现简单,专注数据拷贝,无需关注分布式,故障转移、弹性伸缩等能力。

6)可伸缩,支持动态扩缩容。

7)配置化,低代码,已经支持的数据源只需简单配置就可完成不同数据源之间的,不支持的数据源,只需要实现Connect API专注数据拷贝即可快速完成新的数据源的支持。

2.2 Connector工作原理

RocketMQ Connect是一个独立的的分布式,可伸缩,容错的系统,它主要为RocketMQ提供与各种外部系统的数据的流入流出能力。用户不需要编程,只需要简单的配置即可使用RocketMQ Connect,例如从MySQL同步数据到RocketMQ,只需要配置同步所需的MySQL的账号密码,链接地址,和需要同步的数据库,表名就可以了。

2.3 Connector的使用场景

✪ 2.3.1 构建流式数据管道

在业务系统中,利用MySQL完善的事务支持,处理数据的增删改,使用ElasticSearch,Solr等实现强大的搜索能力,或者将产生的业务数据同步到数据分析系统,数据湖中(例如hudi),对数据进一步处理从而让数据产生更高的价值。

使用RocketMQ Connect很容易实现这样的数据管道的能力,只需要配置3个任务,第一个从MySQL获取数据的任务,第二,三个是从RocketMQ消费数据到ElasticSearch,Hudi的任务,配置3个任务就实现了从MySQL到ElasticSearch,MySQL到hudi的两条数据管道,既可以满足业务中事务的需求,搜索的需求,又可以构建数据湖。

✪ 2.3.2 系统迁移

随着业务的发展,往往会对旧的系统进行系统优化,甚至重构,在新的系统中可能选用新的组件,新的技术框架,旧的系统又无法直接停机将数据迁移到新系统。

如何动态的将数据从旧存储系统迁移到新的存储系统,例如旧的系统使用的是ActiveMQ,新的系统选用了RocketMQ,只需要配置一个从ActiveMQ到RocketMQ的Connector任务即可实时的将旧系统的数据迁移到新系统,实现旧系统的ActiveMQ上的数据迁移到新系统RocketMQ中,同时也不影响业务,用户无感知。

✪ 2.3.3 CDC

图片来源:https://luminousmen.com/post/change-data-capture**

CDC作为ETL模式之一,可以通过近乎实时的增量捕获数据库的 INSERT、UPDATE,DELETE变化,RocketMQ Connect流式数据传输,具备高可用,低延时等特性,通过Connector很容易实现CDC。

03 数据的序列化与转换

前面对RoketMQ Connect已经有了基本的介绍,知道RocketMQ Connect可以实现很多数据源之间的数据同步,那么这些数据在RocketMQ Connect中是如何流转和处理的呢?Converter和Transform在数据的序列化和转换有十分重要的作用,下面介绍下Converter和Transform。

Converter和Transform和Connector一样都属于RocketMQ Connect的插件,可以使用RocketMQ Connect内置的实现,也可以自定义实现个性化的处理逻辑,通过RocketMQ Connect Worker插件加载能力可以将自定义实现加载到Worker运行时中,通过简单的配置既可以使用自定义的插件。

Converter:转换器,主要负载数据的序列化和反序列化。Converter可以将数据和数据的Schema进行转换并且完成序列化,还可以与Schema Registry配合,将Schema注册到Schema Register中。

Transform:单消息转换,对Connect数据对象进行转换,例如过滤掉不需要的数据, 字段过滤,替换数据Key,大小写转换等,都可以通过Transform进行转换。

Connector的数据处理的基本流程,Source Connect从源数据系统获取数据封装成Connect标准的数据对象,然后经过Transform对单条数据进行转换,转换后的数据会经过Converter,Converter会将数据和Schema处理成支持的格式并进行序列化,如果是支持Schema Registry,Converter会将数据的Schema注册到Schema Registry中,然后序列化数据,最后将序列化后的数据发送到RocketMQ中。

Sink Connect处理流程是从RocketMQ Topic拉取数据,经过数据的反序列化,然后对每条消息进行转换,如果配置了Transform,最后Connector将数据写入到目标存储。

04 RESTful接口

Connector Worker节点提供了RESTful接口方便Connector的创建,停止,配置信息获取接口,使用这些接口可以很方便的管理Connector,只需要调用对应的接口带上对应的配置即可,下面介绍一些常用的接口。

•POST /connectors/{connector name}

•GET /connectors/{connector name}/config

•GET /connectors/{connector name}/status

•POST /connectors/{connector name}/stop

通过这些API即可向Connect Worker创建Connector,停止Connector,查询Connector的配置和状态。

05 Metrics

可观测性在许多系统中都扮演着非常重要的作用,它可以帮助观察系统运行情况,系统是否正常运行,系统的高峰期,空闲期是在什么时候,帮助对系统进行监控问题定位等等。Worker提供了丰富的Metrics信息,可以通过Metrics观察到所有Worker总的tps,消息数量,还可以观察到每个task的tps,成功处理的数量和处理失败的数量等等,通过这些Metrics信息可以很方便的运维Connector。

将Metrics信息展示在Prometheus中是很多系统的选择,Prometheus也提供了接入方式,通过实现一个Prometheus Exporter即可,新增或者修改Metrics修改这个Exporter即可,这是一种将Metrics接入Prometheus的方式。

通过RocketMQ Connect可以提供一种新的接入方式,实现一个通用的Sink Prometheus Connector,Source Connector是获取Metrics所需的Connector,例如RocketMQ和RocketMQ Connect的Metrics信息都写在了文件里,那么获取Metrics的Connector就是SFTP Source Connector,这样就可以将Metrics信息通过Connector的方式接入Prometheus。

06 Connector数据质量

RocketmQ Connect和RocketMQ一样,实现了at least once的语义,即保障至少一次。Source Connector发送消息到RocketMQ失败,producer会重试,如果重试失败,可以选择跳过,还是停止任务,可以通过配置进行选择。Sink Connector拉取消息同样会重试,并且在重试达到一定次数时会将消息发送到死信队列,进而保障消息不丢失。

07 Connector 部署

在创建Connector时,一般是通过配置完成的,Connector一般包含逻辑的Connector连接器和执行数据复制的Task即物理线程,如下图所示,两个Connector连接器和它们对应的运行Task任务。

一个Connector也可以同时运行多个任务,提高Connector的并行度,例如下图所示的Hudi Sink Connector有2个任务,每个任务处理不同的分片数据,从而增加Connector的并行度,进而提高处理性能。

RocketMQ Connect Worker支持两种运行模式,集群和单机集群模式,顾名思义,有多个Worker节点组成,推荐最少有2个Worker节点,组成高可用集群。集群间的配置信息,offset信息,status信息通过指定RocketMQ Topic存储,新增Worker节点也会获取到集群中的这些配置,offset,status信息,并且触发负载均衡,重新分配集群中的任务,使集群达到均衡的状态,减少Woker节点或者Worker宕机也会触发负载均衡,从而保障集群中所有的任务都可以均衡的在集群中存活的节点中正常运行。

单机模式,Connector任务运行在单机上,Worker本身没有高可用,任务offset信息持久化在本地。适合一些对高可没有什么要求或者不需要Worker保障高可用的场景,例如部署在k8s集群中,由k8s集群保障高可用。

08 Connector原理

8.1 概念

Connector

连接器,定义数据从哪复制到哪,是从源数据系统读取数据写入RocketMQ,这种是SourceConnector,或从RocketMQ读数据写入到目标系统,这种是SinkConnector。Connector决定需要创建任务的数量,从Worker接收配置传递给任务。

Task

是Connector任务分片的最小分配单位,是实际将源数据源数据复制数据到RocketMQ(SourceTask),或者将数据从RocketMQ读取数据写入到目标系统(SinkTask)真正的执行者,Task是无状态的可以动态的启停任务,多个Task是可以并行执行的,Connector复制数据的并行度主要体现在Task数量上。

通过Connect的API也可以看到Connector和Task各自的职责,Connector实现时就已经确定数据复制的流向,Connector接收数据源相关的配置,taskClass获取需要创建的任务类型,通过taskConfigs指定最大任务数量,并且为task分配好配置。task拿到配置以后从数据源取数据写入到目标存储。通过下面的两张图可以清楚的看到,Connector和Task处理基本流程。

Worker

worker 进程是Connector和Task运行环境,它提供RESTful能力,接受HTTP请求,将获取到的配置传递给Connector和Task。除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负责均衡能力实现的。

从上面这张图,看到Worker通过提供的REST API接收http请求,将接收到的配置信息传递给配置管理服务,配置管理服务将配置保存到本地并同步给其它worker节点,同时触发负载均衡。

从下面这张图可以看到connect集群中多个worker节点负载均衡后在各自节点启动connector和task的情况。

8.2 Worker集群的服务发现

学习和使用RocketMQ的时候,我们知道NameSrv是RocketMQ服务注册与发现的重要组件,具备简单轻量易运维等特性。通过前面Connector部署方式的介绍,RocketMQ Connect Worker是可以集群方式部署的,那么Worker节点之间的服务发现是如何实现的呢?Worker的服务发现是否依赖外部组件?

在使用一项技术或者中间件的时候,尽量少的依赖额外的组件,这对系统的稳定性,可维护性都非常重要,学习成本也会低一些,这也很容易理解,如果多依赖一个组件,就需要多维护一个,学习成本和后期维护成本就会越高。RocketMQ Connect也是这个原则,尽量少或者不依赖额外的外部组件。

通过前面对RocketMQ Connect的介绍我们知道RocketMQ Connect是RocketMQ数据集成,数据流入流出的重要组件,所以RocketMQ Connect的运行是依赖一个RocketMQ集群的,能否利用RocketMQ的特性实现Worker的服务发现呢?答案是肯定的。

通过RocketMQ客户端有消费客户端变化通知机制,只需要在每个worker启动一个消费者,设置相同的ConsumerGroup,注册监听 NOTIFY_CONSUMER_IDS_CHANGED 事件即可,增加或减少Worker节点就会触发NOTIFY_CONSUMER_IDS_CHANGED事件,监听到这个事件就可以触发Worker集群的负责均衡,通过RocketMQ客户端接口既可以获取所有Worker对应的客户端标识,最后根据在线的客户端和任务进行负载均衡。

8.3 配置同步

Topic的订阅消费分为两种,集群消费,广播消费,不过不同的客户端使用不同的ConsumerGroup也可以达到广播消费的目的,RocketMQ Connect Worker之间元数据的同步就是根据这种原理实现的。

所有的worker节点订阅相同的topic(connector-config-topic),每个worker节点使用不同的consumerGroup,这样就可以实现广播的方式消费同一个Topic,每个worker节点只需要配置发生变化时向connector-config-topic发送配置信息,即可实现各个worker节点间的配置同步。

8.4 位点同步

位点同步与配置同步类似只是使用的是不同的topic和consumerGroup

服务发现与配置/位点管理

Connector配置管理和Task任务的位点管理是类似的,除了需要将配置和位点信息持久化到本地,还会将配置和位点信息同步给集群的其它worker节点,这个同步方式也是通过所有worker节点订阅相同的topic,使用不同的consumerGroup实现的。

8.5 负载均衡

Connect的负载均衡和RocketMQ消费端负载均衡是类似的。都是在每个节点上运行相同的负载均衡算法,只不过负载均衡的对象不一样。

RocketMQ消费端负载均衡是相同ConsumerGroup消费端与MessageQueue之间负载均衡,Connect是worker节点与connector,worker节点与task之间的负载均衡,不过逻辑是类似的,负载均衡算法获取到所有worker和所有的connector,task,并对这些信息排序,根据当前worker节点在排序中所有worker的位置,排序后的connector,task位置与当前的worker位置取模,就可以为当前Worker节点分配好需要运行那些Connector和那些task,然后Worker在本节点启动这些Connector和Task。

Worker扩缩容,worker宕机,新增Connector配置都会触发重新分配,因此Worker集群是弹性的可以故障处理,动态扩缩容的。

8.6 Connector插件加载

Worker加载Connector插件jar包类似于Tomcat加载War包。Tomcat加载War包并不会像jvm加载类一样使用双亲委派模型对类进行加载,而是使用自定义的ClassLoader加载类,使用不同的ClassLoader加载不同的war包的不同的类。

这样可以避免不同的war包中引用相同的jar包因为版本的不同各种包引用问题,Worker也是一样,在加载不同的类插件时为其创建单独的ClassLoader,从而避免相同的类,因为不同war包引用相同jar,并且版本不同而导致的各种包引用问题,在类加载方面Worker像一个分布式的Tomcat。

09 动手实现Connector

了解了RocketMQ Connect的实现原理,下面看一下如何自己实现一个Connector。看下面这个场景:

业务数据写入到MySQL,然后同步到hudi,通过hudi构建数据湖,这个跟我们开头讲到的场景比较相似。通过Connector如何实现这个流程?

通过前面的介绍,应该清楚,需要实现两个Connector,一个是从MySQL到RocketMQ的SourceConnector,第二个是从RocketMQ读数据写入到hudi的Sink Connector。

以MySqlSourceConnector为例介绍如何自己写一个Connector。

首先要实现一个SourceConnector,实现类MySqlConnectorImpl继承SourceConnector接口,实现配置初始化,taskClass,taskConfigs等接口。

taskClass方法指定创建MySqlTask,taskConfigs将接收到的Connector配置并分好每个task配置,这些配置包含连接MySql实例相关的账号,密码,address,port等。Worker 启动MySqlTask并将配置传给MySqlTask,MySqlTask创建到MySql实例的链接并获取MySql Binlog,解析Binlog,并将解析的数据封装成ConnectRecord放到BlockingQueue,poll接口从BlockingQueue中取数据返回。这样就实现了一个Connector。

将写好的Connector打包放到Worker指定目录加载到Worker进程,通过请求Worker http接口创建MySqlSourceConnector。

Worker启动MysqlConnector,通过MysqlConnector taskClass获取MySqlTask并启动Task,WorkerSourceTask从poll接口获取数据,然后把获取到的数据通过Producer发送到RocketMQ,这样就完成了MySql到RocketMQ的数据同步的Connector。

10 Connector现状与未来

CDC方面已与debezium完成适配,jdbc标准协议也已支持并且在此基础上与openmldb社区合作开发了与面向机器学习数据库openmldb的Connector,数据湖方面与hudi也建立了链接,不久的将来还会与Doris,clickhouse,es等流行的存储建立连接,如果有熟悉的的存储,每个人都可以通过OpenMessaging Connect API开发一个与RocketMQ连接器。

提到OpenMessaging Connect API,简单介绍一下OpenMessaging,OpenMessaging是Linux 基金会下一个开源组织,致力于制定消息领域的标准,除了OpenMessaging Connect Api还有存储方面的标准OMOI,压测方面的标准OMBI,流计算方面的标准OMS等等。

11 Connector Tutorial

QuickStart

https://github.com/apache/rocketmq-connect

RocketMQ Connect & OpenMLDB

OpenMLDB 是一个开源机器学习数据库,提供线上线下一致的生产级特征平台。通过与OpenMLDB社区合作,RocketMQ与机器学习数据库OpenMLDB建立连接。

RocketMQ Connect & Debezium

https://mp.weixin.qq.com/s/YNjylhmo1IlvAEKwpjjMkg‍

12 总结

本文介绍了RocketMQ Connect的概念,然后讲解了RocketMQ Connect的实现原理,对服务发现,配置同步,位点同步,负载均衡都有了初步的介绍,接着以MySqlSourceConnector为例讲解了如何自己实现一个Connector,最后对Connect API和生态做了一些介绍,提供了一些RocketMQ Connect相关的上手教程,希望本文对学习RocketMQ Connect有帮助,更希望感兴趣的同学能参与进来一起贡献社区,实现一个自己熟悉的存储系统到RocketMQ的Connector。

rocketmq-connect

https://github.com/apache/rocketmq-connect

api

https://github.com/openmessaging/openconnect

欢迎留言一起参与讨论~