文章最后更新时间2024年11月26日,若文章内容或图片失效,请留言反馈!
导读大家好,我是来自哔哩哔哩的张陈毅,今天给大家分享的 topic 是B 站基于 Iceberg 的流批一体的探索和实践。
1. 海量用户行为数据传输
2. 商业和 AI 的在线训练
3.DB 数据同步
4. Iceberg 维表 Join
5. Q&A
分享嘉宾|张陈毅 哔哩哔哩 资深开发工程师
编辑整理|杨维旭
内容校对|李瑶
出品社区|DataFun
海量用户行为数据传输
1. 实时数据传输概览
如上图所示,B 站的数据众多,大致可以划分为三类:日志数据、用户行为数据和业务库数据。每个业务方上报的数据以 log id 进行标识,log id 是数据在传输和集成过程中的源信息标识,其中日志数据和用户行为数据会通过 log-agent 和 bfe-agent 上报给到数据网关,然后数据网关会根据 log id 分别路由到内部的 Kafka 数据缓冲层,业务库数据则会通过 Flink CDC connector 将全量数据和增量的 binlog 数据分发到数据缓冲层 Kafka。下游的数据分发会根据用户的实际场景,可以写入到 Kafka、Hive 和 Iceberg。2. 海量用户行为数据传输(背景)
上图是迭代前的用户行为数据传输架构。APP 移动端行为数据和 Web 服务器行为数据会统一传输给 Kafka 缓冲层。接着分为实时和离线两条链路,实时链路中,数据通过 Lancer 数据集成分发到 Kafka ODS 层,然后使用 Flink Bsql 流式计算写入到 Kafka DWD 层,供下游应用层做实时计算。离线链路中,通过 Lancer 数据集成之后,分发到 Hive ODS 层,一般是小时或天级别分区表,然后内部的调度系统会拉起 Spark 离线任务,产出 Hive 的 DWD 层数据,供下游应用层做离线计算。对比实时链路和离线链路,它们的特征差异也比较明显。主要有以下几点:- 实时链路的数据新鲜度高,在毫秒级;而离线链路的数据新鲜度低,一般由 Hive 的分区按天或小时而决定。
- 存储成本上,实时链路的 Kafka 一般会选择高成本存储介质 SSD 或 NVMe;而离线链路的 Hive 存储在 HDFS 上,一般会选择低成本存储介质 HDD。
- 实时链路的 Kafka 一般仅会保留 1~3 天内的数据,而离线链路的 Hive 可以按需保留一年或更久的时间。
- 在 Batch 的性能上,实时链路本身的特性就是,读数据需要将整个 row data 反序列化后,获取指定列的结果,在性能上比使用列式存储的 Hive 会弱一些。
- Kafka 需要指定 timestamp 或者是 offset 的方式去做 Batch 的读,在易用性方面也比较弱。
3. 海量用户行为数据传输(迭代)
我们结合数据湖 Iceberg,对整个用户行为数据链路做了一定的迭代,在数据进入 Kafka 缓冲层之前,会保持不变。在下游的数据分发中,将离线链路的 ODS 层至 DWD 层的存储从 Hive 替换成了 Iceberg。其中 ODS 到 DWD 的计算,由原先的 Spark 离线变成了 Flink Streaming,这主要得益于 Iceberg 可以根据 snapshot 做增量的流读消费,可以让 DWD 数据的新鲜度基本保持在分钟级,相比原先的 Hive 的小时级缩短了不少。如果在数据传输过程中发现数据异常,可以借助平台工具,直接将 Flink 流计算的 SQL 根据业务时间诉求自动格式化为相应的 SQL,转化成 Flink Batch SQL,直接跑在 Batch 任务中,无需做多引擎的 SQL 开发,并且可以直接复用流计算的 UDF。除此之外,相比于 Kafka 链路,如果用户需要对 Kafka DWD 数据做 Batch 相关的分析,用户原先一般会将 Kafka DWD 数据 dump 到 Hive 中,然后再做分析,而使用 Iceberg 后,可以直接使用 Flink Batch 去做 query。在此次迭代中,我们也同步优化了用户行为数据场景中数据重复的问题,在之前的离线链路中,同一个用户行为数据可能会在多个任务中同时存在。改为 Iceberg 后,我们在下游数据应用层做了 Iceberg 的视图,上游的数据会按照 BU 来划分,整体减少了数据重复存储的浪费。最后一点是对数据异步的优化,Iceberg 会借助内部服务做异步优化数据,可以整体加速 Batch query 的速率。4. 海量用户行为数据传输(数据优化)
Magnus 是 B 站自研的一个 Iceberg 表智能数据优化服务,它的一大功能是做数据优化。上图中展示了优化后的数据流程图。可以看到,Iceberg 的数据由 Spark、Flink 和 SDK 任务写入,当任务提交一个 snapshot 时,会发送一个 Commit Event 给到 Magnus 服务,然后 Magnus 服务会根据不同的事件属性以及不同的表优化策略生成对应的 Optimize Job,提交到内部的队列当中等待,Job 提交器会根据集群资源的空闲情况,依次提交队列中的 Optimize Job。其中 Flink 流计算写入的数据生成的 Optimize Job 的优先级相对较高,会优先执行。流计算任务目前设置 checkpoint 默认为 5 分钟,频繁的写入,会导致生成大量的小文件,这在一定程度上会影响下游查询性能。小文件合并主要是为了减少小文件数量,通过将多个小文件合并成较大的文件,从而提升查询的效率。大文件在读取数据时可以减少文件系统的开销,改善磁盘的 IO 性能,同时减少元数据的管理负担。
Iceberg 支持对数据进行排序,用于提高过滤和聚合操作的效率。通过对数据进行合理的排序,可以优化数据读取的顺序,减少数据随机访问的次数,从而提升整体的查询效率。尤其是在处理范围查询的时候,排序可以大幅减少所需扫描的数据量。
在 Iceberg 中可以选择合适的分区和数据分布,以确保数据在各个节点中的数据均匀分布,从而实现更快的并行处理。优化后可以减少数据倾斜现象,提高查询的并行效率。
通过创建索引,使下游查询效率得到提高,避免了全表扫描。
通过创建预计算文件,提前计算和存储预计算的结果,使后续查询速度得到了提升。
通过以上五个优化策略,在 Batch 场景中可以有效提升 Iceberg 的查询性能,减少文件 IO,优化数据存储结构,提升整体的数据访问效率。5. 海量用户行为数据传输(收益)
在成本方面,一年的计算和存储成本节省约 355 万,其中 CPU 使用率降低了 20%,内存消耗降低了 22%。性能方面,通过新技术迭代,经过多个实际迁移任务的前后分析,发现平均性能提升了 48.9%。商业和 AI 的在线训练
1. 商业和 AI 的在线训练(背景)
在商业和 AI 的在线训练场景中,展现流和点击流会通过 Flink 双流 join 进行行为归因,生成展点流的 Base 数据,写入 Kafka,然后展点流数据会通过另一个 Flink 作业进行特征计算,会有本地的 Jni 的调用,其依赖于离线计算好的用户画像、视频基础信息、UP 主挖掘信息等数据。特征计算的结果会写到 Kafka,得到一个实时流训练样本。实时流训练样本会通过两种训练方式进入到训练平台:第一种是通过实时训练读取 Kafka 实时流数据输入;第二种是将 Kafka 的数据通过 Flink SQL Dump 到 Hive 小时表或天表中供离线训练。除了发布到线上的这条链路外,还有多个实验链路会从 Kafka 读取 base 展点流数据并写入到训练平台的存储中供下一步训练使用。- 首先,训练数据存储不统一,实时训练特征计算结果会存在 Kafka,而离线训练结果存储在 Hive 中。
- 其次,使用实时训练和离线训练两套 API 来开发,存在计算层不统一的现象。
- 第三,为了做离线训练,需要将实时流训练样本 Kafka 数据,通过 Flink SQL Dump 到 Hive 表中,而 Dump 任务的 CPU 消耗一般是原特征计算任务的 1/3,原因是整个链路中数据流量本身比较大,为了节省数据传输的网络带宽消耗,写入 Kafka 的数据在多层中都使用了 PB 格式,在流量大的情况下,下游 Dump 任务反序列化 PB 数据以及 Dump 写到 Hive 表中均需要比较多的计算资源。
- 除了发布到线上的训练流程外,算法人员也会启动多个实验链路来优化整体的训练模型,这样的结果会发现展点流 base Kafka 的网络带宽以及磁盘 IO 均是写 IO 的 20 多倍,即使扩展了 Kafka 的分区数,对此 topic 所在 Kafka 集群网络读 IO 也是比较大的。
2. 商业和 AI 的在线训练(迭代)
基于上述背景,我们首先将后半段特征计算的输出结果由原先的 Kafka 改为了 Iceberg,实时训练和离线训练会使用统一的 API,这样在存储和 API 层面就达到了统一的结果。与之前的双存储相比,节省了 Kafka 的存储资源, 对写入 Iceberg 的数据我们也做了压缩,并在 HDFS 文件上进行了优化,进一步降低了整体的存储成本。与此同时,相比之前的模式,我们节省了 dump 任务流计算的 CPU 消耗,整体计算成本降低 20%。对于算法 online 发布链路,我们还对 Iceberg 数据做了冷热分离,将 3 天内的新鲜数据放在高存储成本介质 SSD 上,用于加速下游实时训练。3. 商业和 AI 的在线训练(规划)
对于上述场景,我们也在规划后续升级 Flink 1.20 版本,并做一些适配,计划将 Kafka base 展点流数据替换为 Iceberg。目前我们内部的 Flink 主要是 1.11 和 1.15 两个版本,即使开启了 Unaligned Checkpoint,也没办法满足用户一分钟的时效性要求。因为展现流和点击流做双流 join 使用的是社区的 interval join 的定制版,即便把我们数据延迟发送时的双流 join 时间窗口设置为 1 小时,在流量较大的背景下,依然会导致其 keyed state 处在很大的量级,大致在 10 TB 级别以上。当降低 checkpoint 时间的时候,我们发现 RocksDB 的 CPU 共振会导致 checkpoint 失败率上升,最后影响下游数据的正常消费。Iceberg 的数据可读是严格依赖于 snapshot commit 的成功生成的,我们期望借助于高版本的 GIC 的能力,降低 checkpoint 的时间来满足双流 join 生成 Iceberg 数据时效性。DB 数据同步
首先 Flink CDC 会将全量数据和增量的 binlog 数据同步到 Kafka 缓冲层,然后使用 Flink 流,将 binlog 完整写出到 Iceberg Append 表中,其中会附带完整的 binlog 记录,依据流计算的 watermark 推进进度,通知下游 Spark merge 任务,数据 merge 合并会依赖于 binlog 的 timestamp 和主键去做排序,然后实现数据更新、新增以及删除操作的同步。由于迭代的原因,我们目前暂未将 Iceberg 的数据给到用户,而是将 Iceberg Dump 的 Hive 表给到用户。整个 DB 同步是通过实时加离线的组合同步产出 Hive 表给到用户,每个分区为全量的 DB 快照数据。除此之外,在高流量场景下,我们需要对 DB 做维表 join 的情况下,我们也会建议用户直接使用 Hive 作为 MySQL 维表 join 的一个替代方案,用以缓解 DB query 的压力。但是缺点也比较明显,就是 Hive 维表 join 的时效性会比较低,至少为小时级。2. DB 数据同步(迭代)
基于上述背景,首先我们保留了原有实时加离线组合链路上产出 Hive 表的这条链路,原因是用户使用基数比较庞大。在此基础上面我们增加了一条实时链路,去做用户实时场景的补强。我们内部的 Iceberg 升到 1.4 版本之后做了一些改造,将 Flink CDC 的数据直接写到 Iceberg MOR 表中,这样 Iceberg 表可以作为 MySQL 的一个镜像来做实时查询,时效性可以提升到分钟级。并且通过改造,提供了 Iceberg 增量流读 change log 的能力。3. DB 数据同步(流读)
Change log 增量流读的具体改造如上图所示。在这里,我们对 Equality Delete Files 的内容做了一定的改造,将其做了扩展,记录了整个需要删除的 row data 值用于适配 CDC 同步场景,当 DB 数据同步更新的时候,binlog 会记录 update before 的结果,在删数据的时候,binlog 也会记录整个删除的数据值,这样在相同的 snapshot 下若有一条行级删除数据需要写入 Equality Delete Files,直接将整个删除的完整记录写入此文件即可,这样可以方便后面后续增量流读 changelog,无需查询历史 snapshot 的 Data File,从而优化增量流读 change log 的速率。第二个优化点是在增量流读 MOR 表方面,为了保证数据能够同步有序的回放 binlog 的记录,在增量流读时,我们首先会根据 snapshot 的 timestamp 时间去做排序,然后在相同的 snapshot 下,会先读 Equality Delete Files,然后依次去读 Data File 和 Position Delete Files。在写入侧,我们也针对自身场景,额外在 Write Operator 引入了 keyed state 记录主键值。前文提到,我们内部的 DB 数据同步会首先写入到 Kafka 中,然后再消费,而写入 Kafka 的流计算任务是会存在重启的,因此我们增加 keyed state 来防止数据重放。最后一点是借助 Magnus 服务,对 MOR 表做异步的 compaction 合并操作,防止 Equality Delete Files 过多影响 Batch 查询。4. DB 数据同步(展望)
对 DB 同步场景,我们期望将两条链路合并成一条单链路,并提供下面三个能力:- 即时查询,Iceberg 可以作为 MySQL 的镜像表来做实时查询,达到分钟级延迟。
- 在实时消费的场景下,我们希望 Iceberg 具备增量流读 change log 的能力。
- 离线场景下,希望 Iceberg 每个 tag 是一个全量的历史快照数据。
以上前两点在上述新增链路中已经实现,而第三点合入到此链路中需要做以下几个优化事项:- 首先,tag 想要将数据按照时间整点去做切割的话,需要借助于 Flink checkpoint 的触发,去关联 watermark,也就是 watermark 到了整点的时间点去触发对应的 checkpoint,然后在 snapshot 上面去打一个对应的 tag。
- 其次,为了避免数据漂移问题,比如 T2 的数据可能会落入到 T1 中,为了解决这个问题,我们在 Writer 算子前面增加一个 cache 算子,用来缓存整点时间尚未到达时提前进入 Writer 算子的数据。
- 最后,需要将 tag 打出来的数据独立于 Data File,以避免非分区表 data 目录下面文件数量达到上限的问题。
Iceberg 维表 Join
1. Iceberg 维表 Join(背景)
接下来我们看一下 Iceberg 维表 join 的场景。AI 和商业的流计算任务都有一个显著特性,就是数据流量特别大。当使用双流 join 来关联远端 DB 进行维表 join 时,就会存在以下痛点:- 首先,我们内部提供的维表连接器数量是比较多的,每个连接器的私有参数也很多,导致用户的整体理解和使用成本较高,其中有部分维表属于 MySQL 维表中 DB 无法承受那种流计算高 QPS 压力而出现的替代品,如 Redis,其存储成本也会有所提升。
- 另外,双流 join 任务中会挂一个维表 join,其任务并行度比较高,致使 TaskManager 数量比较多,导致其 connector 链接数增加。
- 最后一点是,即使在本地每个 TaskManager 做了一些 cache 的优化,比如针对 MySQL,基于内存做了优化,对于 Hive 和 HDFS 维表我们在本地做了 RockDB 的缓存。但是这些缓存都是无状态的,当任务重启时,所有维表数据都需要从远端重新拉起,整体来看维表使用 DB 压力会比较大。
2. Iceberg 维表 Join(迭代)
针对上述问题,我们发现可以基于 Iceberg 的 change log 增量流读能力,提供 lookup join 的替代品,构建一个新的 Flink 维表 join,此维表 join 需要满足如下几个特性:- 第一,维表的全量数据需要存储在 State 当中,这样任务即使重启,也无需对全量数据做重新加载。
- 第二,State 需要具备无 TTL 过期的属性,也就是数据不会过期。
- 第三,维表需要保有全量数据外,还需要具有增量流读能力,来更新新 join 算子中的 State 数据。
- 第四,增量流读的数据同时也需要具备 change log 的特性,以 MySQL 表做 join 为例,用户作维表 join 的 on 表达式使用的字段,一般不会和主键字段一致。如果没有 change log 特性,一条主键 id=1 的数据在多次更新后,可能会在 join 算子的多个 subtask 中同时存在,最终会影响 join 的结果。
- 第五,维表需要有主键。这是在 keyed State 当中,根据主键去做 State 更新的关键。
通过上面能力的迭代,Iceberg 满足了新 join 对于 source 所需特性的诉求,基于此我们也做了相应的适配,构建新的维表 join 算子。3. Iceberg 维表 Join(改造)
为了构建新的 Lookup join,我们做了如下改造:第一,为了支持流转批场景,使流计算 SQL 能够在基本不改变 SQL 写法的情况下,自动切换至在 Flink Batch 中运行,我们借助了 Calcite 的使用 SQLHint 的方式来支持新 join 语法,降低 SQL 的撰写难度和区分存量 join,只需要在 a join b 后面加一个 SQLHint 和对应的 options 即可。
第二,在用户做维表 join 时,我们不希望强制用户使用主键字段去做 join,否则易用性会比较差。而这样带来的一个问题是 Flink SQL 的 Rule 集合优化器,会因为整个 DAG 当中没有使用主键字段,而进行裁剪移除,最后会导致 Flink join 算子中无法获取主键字段,这样也就没办法保证在 State 中根据主键作维表数据更新的诉求。所以我们对 Calcite 和 Flink Project Rule 做了一些调整,支持保留主键字段,至少在 LookupJoin2 Operator 这个算子是有所保留的,后面的就可以不需要了。
第三,我们在使用 Flink1.15 的时候,会发现 change log 会有一些优化,Flink 会提供一个 DropUpdateBefore 算子,用以剔除-U 的数据。在当前 join 的场景当中,我们将维表 source 算子后的此算子做了移除,使得 State 数据更新得以准确。
最后,新的 Join Operator 在右流初始化全量数据完成前,会将左流数据阻塞。因为对于用户来说,任务启动后,Sink 端如果接收到了数据,就表明已经做完了前面全部的流计算逻辑了。如果说主流在 Right 维表没有做完初始化前不做阻塞的操作,主流 Join 的部分数据可能会存在 Join 结果为 Null 的情况。我们通过实践发现,维表数据即使在亿级别做全量加载,也可以在一分钟左右完成,因为它本身是并行分布式的。
4. Iceberg 维表 Join(收益)
改造前实时链路的主流会做 LookupJoin 关联 DB 写到 Kafka 中,结果数据一般会 dump 到 Hive 用于离线分析。如果要对离线分析的结果从上游做数据的回刷,需要将上游 topic 的数据先做 dump 到 Hive,然后使用 Spark SQL 去关联 Hive 的维表,写入到 Hive 表。改造后,可以直接用 Iceberg 主流维表 Join 关联 Iceberg 维表,写出到下游的 Sink Iceberg,如果要用 batch 的话,可以直接使用平台工具将流任务转化成批任务,指定好相对应的时间即可。通过改造,Flink 流超大流量的维表 join,对远端 DB 的压力可以直接降为 0,离线回刷可以整体复用 Flink 流计算的逻辑,UDF 也可以得到相应的复用。Q&A
Q:Magnus 和 Flink 都会对文件做写入,冲突是如何解决的?A:Magnus 会对 MOR 的合并也会做 compaction 的操作,我们之前也遇到了冲突的问题,后面是取消了 Magnus 对 CDC 写入的 MOR 表做托管数据合并操作,而是让 Flink 的 JobManager 去主动控制触发,也就相当于是 JobManager 会根据当前表的流量,配置不同的属性,配置不同的调度步长,比如一个小时或者两个小时做一次请求 Magnus 的 compaction 操作。这一过程中,整个主流会 block 住,然后等到 Magnus 数据 compaction 完成之后,再将主流数据放下去。优化后 Magnus 能够在分钟级完成整个 compaction,因此对于整个 DB 同步以及数据 MOR 表产出的时效性不会有太大的影响。专注于 Flink SQL/State 和流批一体的工作,为内部提供 Flink 引擎相关的技术支撑。
推荐站内搜索:最好用的开发软件、免费开源系统、渗透测试工具云盘下载、最新渗透测试资料、最新黑客工具下载……
宙飒天下
还没有评论,来说两句吧...