1.开源共建 | 中国移动冯江涛:ChunJun(原FlinkX)在数据入湖中的源码应用
2.详解flink中Look up维表的使用
3.Flink深入浅出:JDBC Connector源码分析
4.Flink+hudi 构架沧湖一体化解决方案
开源共建 | 中国移动冯江涛:ChunJun(原FlinkX)在数据入湖中的应用
ChunJun,原名FlinkX,源码是源码一个由中国移动云能力中心开发的开源数据集成工具,基于Apache Flink提供高效的源码数据处理。年4月,源码数栈技术团队在GitHub上开源FlinkX,源码九方智投如何查看源码得到了开发者们的源码广泛合作与支持,推动了工具的源码快速发展。两年后的源码年4月,技术团队对FlinkX进行了升级,源码更名为ChunJun,源码致力于与全球开发者共同推进数据集成技术的源码发展。
文章重点讲述的源码是FlinkX在数据入湖场景中的应用,特别是源码针对企业面临的多源异构数据同步挑战。传统方法如脚本同步存在效率低、源码运维困难等问题。FlinkX基于其多源异构数据同步框架,支持自建和云服务的数据双向读写,如移动云数据库、对象存储等,简化了数据快速上云和云间数据交换的配置,降低了开发和运维成本,已在移动云的多个产品中实际应用。
主要内容包括:FlinkX的背景和选型理由,其基于Flink的特性(如断点续传、监控与速率限制)、云上入湖改造(如K8s适配、Hudi插件开发和日志收集),以及未来的array源码图解发展方向,如与FlinkStreamSQL的融合和更广泛的插件支持。此外,还解答了关于FlinkX资源分配和应用场景的问题,以及与Flink CDC的比较。
想要了解更多关于袋鼠云大数据产品和解决方案的信息,可访问官网,或加入开源技术交流群进行深入探讨。
详解flink中Look up维表的使用
背景
在流式计算领域,维表是一种常用概念,主要用于SQL的JOIN操作,以实现对流数据的补充。比如,我们的数据源stream是订单日志,日志中仅记录了订单商品的ID,缺乏其他信息。但在数据分析时,我们需要商品名称、价格等详细信息,这时可以通过查询维表对数据进行补充。
维表通常存储在外部存储中,如MySQL、HBase、Redis等。本文以MySQL为例,介绍Flink中维表的使用。
LookupableTableSource
Flink提供LookupableTableSource接口,用于实现维表功能。通过特定的adc采样源码key列查询外部存储,获取相关信息,以补充stream数据。
LookupableTableSource有三个方法
在Flink中,实现LookupableTableSource接口的主要有四个类:JdbcTableSource、HBaseTableSource、CsvTableSource和HiveTableSource。本文以JDBC为例,讲解如何进行维表查询。
实例讲解
以下是一个示例,首先定义stream source,使用Flink 1.提供的datagen生成数据。
我们模拟生成用户数据,范围在1-之间。
datagen具体的使用方法请参考:
聊聊Flink 1.中的随机数据生成器-DataGen connector
然后创建一个MySQL维表信息:
该MySQL表中样例数据如下:
最后执行SQL查询,流表关联维表:
结果示例如下:
对于维表中存在的数据,已关联出来,对于维表中不存在的数据,显示为null。
完整代码请参考:github.com/zhangjun0x...
源码解析JdbcTableSource
以JDBC为例,看看Flink底层是如何实现的。
JdbcTableSource#isAsyncEnabled方法返回false,即不支持异步查询,因此进入JdbcTableSource#getLookupFunction方法。
最终构造一个JdbcLookupFunction对象。
JdbcLookupFunction
接下来看看JdbcLookupFunction类,它是TableFunction的子类,具体使用可参考以下文章:
Flink实战教程-自定义函数之TableFunction
TableFunction的核心是eval方法,在该方法中,rip源码实现主要工作是使用多个keys拼接成SQL查询数据,首先查询缓存,缓存有数据则直接返回,缓存无数据则查询数据库,并将查询结果返回并放入缓存。下次查询时,直接查询缓存。
为什么要加缓存?默认情况下不开启缓存,每次查询都会向维表发送请求,如果数据量较大,会给存储维表的系统造成压力。因此,Flink提供了LRU缓存,查询维表时,先查询缓存,缓存无数据则查询外部系统。如果某个数据查询频率较高,一直被命中,则无法获取新数据。因此,缓存需要设置超时时间,超过这个时间则强制删除该数据,查询外部系统获取新数据。
如何开启缓存?请参考JdbcLookupFunction#open方法:
即cacheMaxSize和cacheExpireMs需要同时设置,构造缓存对象cache来缓存数据。这两个参数对应的DDL属性为lookup.cache.max-rows和lookup.cache.ttl。
对于具体的缓存大小和超时时间的设置,用户需要根据自身情况自行定义,cf快刀源码在数据准确性和系统吞吐量之间进行权衡。
Flink深入浅出:JDBC Connector源码分析
大数据开发中,数据分析与报表制作是日常工作中最常遇到的任务。通常,我们通过读取Hive数据来进行计算,并将结果保存到数据库中,然后通过前端读取数据库来进行报表展示。然而,使用FlinkSQL可以简化这一过程,通过一个SQL语句即可完成整个ETL流程。
在Flink中,读取Hive数据并将数据写入数据库是常见的需求。本文将重点讲解数据如何写入数据库的过程,包括刷写数据库的机制和原理。
以下是本文将讲解的几个部分,以解答在使用过程中可能产生的疑问:
1. 表的定义
2. 定义的表如何找到具体的实现类(如何自定义第三方sink)
3. 写入数据的机制原理
(本篇基于1..0源码整理而成)
1. 表的定义
Flink官网提供了SQL中定义表的示例,以下以oracle为例:
定义好这样的表后,就可以使用insert into student执行插入操作了。接下来,我们将探讨其中的技术细节。
2. 如何找到实现类
实际上,这一过程涉及到之前分享过的SPI(服务提供者接口),即DriverManager去寻找Driver的过程。在Flink SQL执行时,会通过translate方法将SQL语句转换为对应的Operation,例如insert into xxx中的xxx会转换为CatalogSinkModifyOperation。这个操作会获取表的信息,从而得到Table对象。如果这个Table对象是CatalogTable,则会进入TableFactoryService.find()方法找到对应的实现类。
寻找实现类的过程就是SPI的过程。即通过查找路径下所有TableFactory.class的实现类,加载到内存中。这个SPI的定义位于resources下面的META-INFO下,定义接口以及实现类。
加载到内存后,首先判断是否是TableFactory的实现类,然后检查必要的参数是否满足(如果不满足会抛出异常,很多人在第一次使用Flink SQL注册表时,都会遇到NoMatchingTableFactoryException异常,其实都是因为配置的属性不全或者Jar报不满足找不到对应的TableFactory实现类造成的)。
找到对应的实现类后,调用对应的createTableSink方法就能创建具体的实现类了。
3. 工厂模式+创建者模式,创建TableSink
JDBCTableSourceSinkFactory是JDBC表的具体实现工厂,它实现了stream的sinkfactory。在1..0版本中,它不能在batch模式下使用,但在1.版本中据说会支持。这个类使用了经典的工厂模式,其中createStreamTableSink负责创建真正的Table,基于创建者模式构建JDBCUpsertTableSink。
创建出TableSink之后,就可以使用Flink API,基于DataStream创建一个Sink,并配置对应的并行度。
4. 消费数据写入数据库
在消费数据的过程中,底层基于PreparedStatement进行批量提交。需要注意的是提交的时机和机制。
控制刷写触发的最大数量 'connector.write.flush.max-rows' = ''
控制定时刷写的时间 'connector.write.flush.interval' = '2s'
这两个条件先到先触发,这两个参数都是可以通过with()属性配置的。
JDBCUpsertFunction很简单,主要的工作是包装对应的Format,执行它的open和invoke方法。其中open负责开启连接,invoke方法负责消费每条数据提交。
接下来,我们来看看关键的format.open()方法:
接下来就是消费数据,执行提交了
AppendWriter很简单,只是对PreparedStatement的封装而已
5. 总结
通过研究代码,我们应该了解了以下关键问题:
1. JDBC Sink执行的机制,比如依赖哪些包?(flink-jdbc.jar,这个包提供了JDBCTableSinkFactory的实现)
2. 如何找到对应的实现?基于SPI服务发现,扫描接口实现类,通过属性过滤,最终确定对应的实现类。
3. 底层如何提交记录?目前只支持append模式,底层基于PreparedStatement的addbatch+executeBatch批量提交
4. 数据写入数据库的时机和机制?一方面定时任务定时刷新,另一方面数量超过限制也会触发刷新。
更多Flink内容参考:
Flink+hudi 构架沧湖一体化解决方案
Apache Hudi提供了一种流处理数据集上的时间轴管理方式,支持数据集的即时视图,数据集通过与Hive表类似的目录结构进行组织,分区数据由路径标识,文件则包含分区的数据,每份文件可能对应多个提交,以支持更新操作。Hudi的存储层由元数据、存储类型、视图三部分组成,其中元数据维护数据集上操作的时间轴,支持即时视图的存储。Hudi解决了大规模和近实时应用的限制,提供新架构与湖仓一体的解决方案,实现数据同源、同计算引擎、同存储、同计算口径,数据时效性可达分钟级,满足业务准实时数仓需求。
Hudi对于大规模和近实时应用的重要性在于其解决了数据处理中的多个限制。它通过提供时间轴管理、即时视图和多视图类型支持,使得数据处理更加高效、灵活。同时,Hudi通过湖仓一体的架构,实现了数据同源、同计算引擎、同存储、同计算口径,大大提升了数据处理的效率和灵活性。数据的时效性可以达到分钟级,能够很好地满足业务对准实时数仓的需求。
通过MySQL数据经过Flink CDC进入Kafka,实现多个实时任务复用,避免对MySQL性能的影响。数据在ODS层后,会同时沿着实时数据仓库的链路,从ODS层依次进入DWD、DWS和OLAP数据库,最后供报表等数据服务使用。实时数仓的每一层结果数据会准实时地落至离线数仓,实现程序一次开发、指标口径统一、数据统一。在架构中,存在数据修正的步骤,以处理口径调整或错误的实时任务计算结果,确保历史数据的准确性。整体架构采用Lambda和Kappa混合模式,流批一体数据仓库包含数据质量校验流程,确保数据质量。
在版本选择方面,推荐使用Hudi master与Flink 1.结合,以更好地适应CDC连接器。Hudi的下载可通过mvnrepository.com/artif...获取,最新版本为0.9.0,若需0..0版本,可以加入社区群获取,或自行编译源码。执行Hudi与Flink的集成,首先确保将hudi-flink-bundle_2.-0..0.jar放置于flink/lib下,并执行bin/sql-client.sh embedded命令。
Flink在Hudi上的应用包括新建maven工程并修改pom文件,使用代码构建实验或直接使用官网下载的Flink包构建环境。添加依赖至$FLINK_HOME/lib下,注意在寻找jar包时,CDC 2.0更新了group ID,从com.alibaba.ververica更改为com.ververica。使用Flink SQL CDC在Hudi上构建实验环境,创建MySQL CDC表与Hudi表,修改配置,设置查询模式输出为表和检查点间隔,进行输入导入和数据查询。
在Flink CDC 2.0与Hudi集成过程中,可能会遇到卡在hoodie_stream_write的问题。解决该问题的关键在于检查点配置,设置合适的检查点间隔,以确保数据正常处理和下发。至此,Flink + Hudi 仓湖一体化方案原型构建完成,实现数据同源、高效处理与存储,满足实时与离线数据处理需求。