首页 新闻资讯 地被苗圃
Flink SQL 知其所以然之维表 Join 的性能优化之路(上)附源码
发布日期:2022-08-07 09:31    点击次数:67

本文转载自微信公众号「大数据羊说」,作者antigeneral了呀 。转载本文请联系大数据羊说公众号。

1.序篇

源码公众号后台回复1.13.2 sql lookup join获取。

废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:

背景及应用场景介绍:博主期望你能了解到,flink sql 提供了轻松访问外部存储的 lookup join(与上节不同,上节说的是流与流的 join)。lookup join 可以简单理解为使用 flatmap 访问外部存储数据然后将维度字段拼接到当前这条数据上面 来一个实战案例:博主以曝光用户日志流关联用户画像(年龄、性别)维表为例介绍 lookup join 应该达到的关联的预期效果。 flink sql lookup join 的解决方案以及原理的介绍:主要介绍 lookup join 的在上述实战案例的 sql 写法,博主期望你能了解到,lookup join 是基于处理时间的,并且 lookup join 经常会由于访问外部存储的 qps 过高而导致背压,产出延迟等性能问题。我们可以借鉴在 DataStream api 中的维表 join 优化思路在 flink sql 使用 local cache,异步访问维表,批量访问维表三种方式去解决性能问题。 总结及展望:官方并没有提供 批量访问维表 的能力,因此博主自己实现了一套,具体使用方式和原理实现敬请期待下篇文章。 2.背景及应用场景介绍

维表作为 sql 任务中一种常见表的类型,其本质就是关联表数据的额外数据属性,通常在 join 语句中进行使用。比如源数据有人的 id,你现在想要得到人的性别、年龄,那么可以通过用户 id 去关联人的性别、年龄,就可以得到更全的数据。

维表 join 在离线数仓中是最常见的一种数据处理方式了,在实时数仓的场景中,flink sql 目前也支持了维表的 join,即 lookup join,生产环境可以用 mysql,redis,hbase 来作为高速维表存储引擎。

Notes:

在实时数仓中,常用实时维表有两种更新频率

实时的更新:维度信息是实时新建的,实时写入到高速存储引擎中。然后其他实时任务在做处理时实时的关联这些维度信息。

周期性的更新:对于一些缓慢变化维度,比如年龄、性别的用户画像等,几万年都不变化一次的东西??,实时维表的更新可以是小时级别,天级别的。

3.来一个实战案例

来看看在具体场景下,对应输入值的输出值应该长啥样。

需求指标:使用曝光用户日志流(show_log)关联用户画像维表(user_profile)关联到用户的维度之后,提供给下游计算分性别,年龄段的曝光用户数使用。此处我们只关心关联维表这一部分的输入输出数据。

来一波输入数据:

曝光用户日志流(show_log)数据(数据存储在 kafka 中):

log_id timestamp user_id 1 2021-11-01 00:01:03 a 2 2021-11-01 00:03:00 b 3 2021-11-01 00:05:00 c 4 2021-11-01 00:06:00 b 5 2021-11-01 00:07:00 c

用户画像维表(user_profile)数据(数据存储在 redis 中):

user_id(主键) age sex a 12-18 男 b 18-24 女 c 18-24 男

注意:redis 中的数据结构存储是按照 key,value 去存储的。其中 key 为 user_id,value 为 age,sex 的 json。如下图所示:

user_profile redis

预期输出数据如下:

log_id timestamp user_id age sex 1 2021-11-01 00:01:03 a 12-18 男 2 2021-11-01 00:03:00 b 18-24 女 3 2021-11-01 00:05:00 c 18-24 男 4 2021-11-01 00:06:00 b 18-24 女 5 2021-11-01 00:07:00 c 18-24  

flink sql lookup join 登场。下面是官网的链接。

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#lookup-join

4.flink sql lookup join

4.1.lookup join 定义

以上述案例来说,lookup join 其实简单理解来,就是每来一条数据去 redis 里面搂一次数据。然后把关联到的维度数据给拼接到当前数据中。

熟悉 DataStream api 的小伙伴萌,简单来理解,就是 lookup join 的算子就是 DataStream api 中的 flatmap 算子中处理每一条来的数据,针对每一条数据去访问用户画像的 redis。(实际上,flink sql api 中也确实是这样实现的!sql 生成的 lookup join 代码就是继承了 flatmap)

4.2.上述案例解决方案

来看看上述案例的 flink sql lookup join sql 怎么写:

CREATE TABLE show_log (     log_id BIGINT,     `timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),     user_id STRING,     proctime AS PROCTIME() ) WITH (   'connector' = 'datagen',   'rows-per-second' = '10',ali   'fields.user_id.length' = '1',   'fields.log_id.min' = '1',   'fields.log_id.max' = '10' );  CREATE TABLE user_profile (     user_id STRING,     age STRING,     sex STRING     ) WITH (   'connector' = 'redis',   'hostname' = '127.0.0.1',   'port' = '6379',   'format' = 'json',   'lookup.cache.max-rows' = '500',   'lookup.cache.ttl' = '3600',   'lookup.max-retries' = '1' );  CREATE TABLE sink_table (     log_id BIGINT,     `timestamp` TIMESTAMP(3),     user_id STRING,     proctime TIMESTAMP(3),     age STRING,     sex STRING ) WITH (   'connector' = 'print' );  -- lookup join 的 query 逻辑 INSERT INTO sink_table SELECT      s.log_id as log_id     , s.`timestamp` as `timestamp`     , s.user_id as user_id     , s.proctime as proctime     , u.sex as sex     , u.age as age FROM show_log AS s LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u ON s.user_id = u.user_id 

这里使用了 for SYSTEM_TIME as of 时态表的语法来作为维表关联的标识语法。

Notes:

实时的 lookup 维表关联能使用处理时间去做关联。

运行结果如下:

log_id timestamp user_id age sex 1 2021-11-01 00:01:03 a 12-18 男 2 2021-11-01 00:03:00 b 18-24 女 3 2021-11-01 00:05:00 c 18-24 男 4 2021-11-01 00:06:00 b 18-24 女 5 2021-11-01 00:07:00 c 18-24 男

flink web ui 算子图如下:

flink web ui

但是!!!但是!!!但是!!!

flink 官方并没有提供 redis 的维表 connector 实现。

没错,博主自己实现了一套。关于 redis 维表的 connector 实现,直接参考下面的文章。都是可以从 github 上找到源码拿来用的!

flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)

4.3.关于维表使用的一些注意事项

同一条数据关联到的维度数据可能不同:实时数仓中常用的实时维表都是在不断的变化中的,当前流表数据关联完维表数据后,如果同一个 key 的维表的数据发生了变化,已关联到的维表的结果数据不会再同步更新。举个例子,维表中 user_id 为 1 的数据在 08:00 时 age 由 12-18 变为了 18-24,那么当我们的任务在 08:01 failover 之后从 07:59 开始回溯数据时,原本应该关联到 12-18 的数据会关联到 18-24 的 age 数据。这是有可能会影响数据质量的。所以小伙伴萌在评估你们的实时任务时要考虑到这一点。

会发生实时的新建及更新的维表博主建议小伙伴萌应该建立起数据延迟的监控机制,防止出现流表数据先于维表数据到达,导致关联不到维表数据

4.4.再说说维表常见的性能问题及优化思路

所有的维表性能问题都可以总结为:高 qps 下访问维表存储引擎产生的任务背压,数据产出延迟问题。

举个例子:

在没有使用维表的情况下:一条数据从输入 flink 任务到输出 flink 任务的时延假如为 0.1 ms,那么并行度为 1 的任务的吞吐可以达到 1 query / 0.1 ms = 1w qps。 在使用维表之后:每条数据访问维表的外部存储的时长为 2 ms,那么一条数据从输入 flink 任务到输出 flink 任务的时延就会变成 2.1 ms,那么同样并行度为 1 的任务的吞吐只能达到 1 query / 2.1 ms = 476 qps。两者的吞吐量相差 21 倍。

这就是为什么维表 join 的算子会产生背压,任务产出会延迟。

那么当然,解决方案也是有很多的。抛开 flink sql 想一下,如果我们使用 DataStream api,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下:

按照 redis 维表的 key 分桶 + local cache:通过按照 key 分桶的方式,让大多数据的维表关联的数据访问走之前访问过得 local cache 即可。这样就可以把访问外部存储 2.1 ms 处理一个 query 变为访问内存的 0.1 ms 处理一个 query 的时长。

异步访问外存:DataStream api 有异步算子,可以利用线程池去同时多次请求维表外部存储。这样就可以把 2.1 ms 处理 1 个 query 变为 2.1 ms 处理 10 个 query。吞吐可变优化到 10 / 2.1 ms = 4761 qps。

批量访问外存:除了异步访问之外,我们还可以批量访问外部存储。举一个例子:在访问 redis 维表的 1 query 占用 2.1 ms 时长中,其中可能有 2 ms 都是在网络请求上面的耗时 ,其中只有 0.1 ms 是 redis server 处理请求的时长。那么我们就可以使用 redis 提供的 pipeline 能力,在客户端(也就是 flink 任务 lookup join 算子中),攒一批数据,使用 pipeline 去同时访问 redis sever。这样就可以把 2.1 ms 处理 1 个 query 变为 7ms(2ms + 50 * 0.1ms) 处理 50 个 query。吞吐可变为 50 query / 7 ms = 7143 qps。博主这里测试了下使用 redis pipeline 和未使用的时长消耗对比。如下图所示。

redis pipeline

博主认为上述优化效果中,最好用的是 1 + 3,2 相比 3 还是一条一条发请求,性能会差一些。

既然 DataStream 可以这样做,flink sql 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作

4.5.lookup join 的具体性能优化方案

按照 redis 维表的 key 分桶 + local cache:sql 中如果要做分桶,得先做 group by,但是如果做了 group by 的聚合,就只能在 udaf 中做访问 redis 处理,并且 udaf 产出的结果只能是一条,所以这种实现起来非常复杂。我们选择不做 keyby 分桶。但是我们可以直接使用 local cache 去做本地缓存,虽然【直接缓存】的效果比【先按照 key 分桶再做缓存】的效果差,但是也能一定程度上减少访问 redis 压力。在博主实现的 redis connector 中,内置了 local cache 的实现,小伙伴萌可以参考下面这部篇文章进行配置。

异步访问外存:目前博主实现的 redis connector 不支持异步访问,但是官方实现的 hbase connector 支持这个功能,参考下面链接文章的,点开之后搜索 lookup.async。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/

批量访问外存:这玩意官方必然没有实现啊,但是,但是,但是,经过博主周末两天的疯狂 debug,改了改源码,搞定了基于 redis 的批量访问外存优化的功能。

4.6.基于 redis connector 的批量访问机制优化

先描述一下大概是个什么东西,具体怎么用。

你只需要在 StreamTableEnvironment 中的 table config 配置上 is.dim.batch.mode 为 true,sql 不用做任何改动的情况下,flink lookup join 算子会自动优化,优化效果如下:

lookup join 算子的每个 task 上,每攒够 30 条数据 or 每隔五秒(处理时间) 去触发一次批量访问 redis 的请求,使用的是 jedis client 的 pipeline 功能访问 redis server。实测性能有很大提升。

关于这个批量访问机制的优化介绍和使用方式介绍,小伙伴们先别急,下篇文章会详细介绍到。

5.总结与展望

源码公众号后台回复1.13.2 sql lookup join获取。

本文主要介绍了 flink sql lookup join 的使用方式,并介绍了一些经常出现的性能问题以及优化思路,总结如下:

背景及应用场景介绍:博主期望你能了解到,flink sql 提供了轻松访问外部存储的 lookup join(与上节不同,上节说的是流与流的 join)。lookup join 可以简单理解为使用 flatmap 访问外部存储数据然后将维度字段拼接到当前这条数据上面

来一个实战案例:博主以曝光用户日志流关联用户画像(年龄、性别)维表为例介绍 lookup join 应该达到的关联的预期效果。

flink sql lookup join 的解决方案以及原理的介绍:主要介绍 lookup join 的在上述实战案例的 sql 写法,博主期望你能了解到,lookup join 是基于处理时间的,并且 lookup join 经常会由于访问外部存储的 qps 过高而导致背压,产出延迟等性能问题。我们可以借鉴在 DataStream api 中的维表 join 优化思路在 flink sql 使用 local cache,异步访问维表,批量访问维表三种方式去解决性能问题。

 

总结及展望:官方并没有提供 批量访问维表 的能力,因此博主自己实现了一套,具体使用方式和原理实现敬请期待下篇文章。

 



Powered by 北京地球天使环保科技有限公司 @2013-2022 RSS地图 HTML地图