1. 现象

最近在尝试 Flink CDC 写 Hudi 的湖仓一体方案验证,调试过程中发现:当使用 Flink 写入 Hudi MOR 表时,虽然 log files 正常生成, 但是在 Hudi 表触发第一次 compaction 之前,rt 表一直查不到数据,理想的情况下,对于 MOR 表,rt 表应该能查到 log files 中的数据才对。

2. 复现步骤

Table API 实现方式,配置如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 /**
 * MOR
 *  ro:读优化表
 *  rt:近实时表
 */
String hudiSinkDDLMOR = "CREATE TABLE hudi_table_mor(\n" +
        "id String,\n" +
        "name String,\n" +
        "age Int,\n" +
        "PRIMARY KEY (id) NOT ENFORCED \n" +
        ") WITH (\n" +
        // 基本配置
        "'connector' = 'hudi',\n" +
        "'write.operation' = 'upsert',\n" + 
        "'write.precombine' = 'true',\n" + // 默认情况下为 false,会有重复项
        "'table.type' = 'MERGE_ON_READ',\n" +
        String.format("'path'= '%s',\n", basePath) +
        // 并发参数配置
        "'write.tasks' = '2',\n" +
        "'write.bucket_assign.tasks' = '2',\n" + // 增加该值会导致bucket数量增加,即增加小文件数
        // hudi schema 同步到 Hive metastore
        "'hive_sync.conf.dir'='/opt/apache-hive-3.1.3-bin/conf',\n" +
        "'hive_sync.enabled' = 'true',\n" + // 将数据集注册并同步到 hive metastore
        "'hive_sync.mode' = 'hms',\n" + // 采用 hive metastore 同步
        "'hive_sync.metastore.uris' = 'thrift://localhost:9083',\n" +
        String.format("'hive_sync.db' = '%s',\n", dbName) +
        String.format("'hive_sync.table' = '%s'\n",tableName) +
        ")" +
        "" +
        ""
        ;

2.2. Insert data into MySQL table.

1
insert into t_mor_test(id,name,age) values('0124-24','test19',19);

2.3. Query rt table by Hive

MySQL 数据只更新一次的情况下,rt 表查询结果为空,当 MySQL 数据写入5次,达到 compaction 条件之后(默认 num_commits 达到 5 次触发 compaction),rt 表中才有数据返回。

这里可以看到,HDFS 目录下有 log files 文件,但是 rt 表无数据返回:

img.png

然后我新建一张表插入5次数据后,发现伴随着第一次 compaction 触发,rt 表的数据才开始查询正常:

img_1.png

3. 环境

Flink:1.17.0

Hudi:0.14.0

Hive:3.1.3

Hadoop:3.3.6

4. 解决

今天逛 github 偶然看到了某个类似 issue 下贴了玉兆大佬的文档,参照文档我修复了这个问题,具体链接放这里,大家可以自行参考:https://www.yuque.com/yuzhao-my9fz/kb/kgv2rb

issue 贴:https://github.com/apache/hudi/issues/10465

按照玉兆文档中所写,Hive 3.1.0 的兼容性问题会导致这种查询异常问题,所以我也尝试重新编译了下 Hive 3.1.3,这里浅浅记录一下步骤:

  1. 下载源码
1
2
3
4
## 下载
wget https://dlcdn.apache.org/hive/hive-3.1.3/apache-hive-3.1.3-src.tar.gz
## 解压
sudo tar -zxvf apache-hive-3.1.3-src.tar.gz
  1. 修改对应源码

记得引入对应 java.util 的包

img.png

  1. 编译 hive-common 和 hive-exec

确保自己安装了 Maven,在 common 下编译 hive-common ,ql 下编译 hive-exec

当然也可以在解压的包下整个编译,但是平台不同可能编译会不那么顺利,几个小时都有可能,这个看个人选择哈

比如我这里是 M1,编译到 standalone-metastore 会因为 protobuf:2.5.0 不支持 M1 而报错

因为这里其他包不影响,所以只编译其中两个即可

1
mvn clean package -Pdist -DskipTests -Dmaven.javadoc.skip=true

成功后如下图所示:

img.png

  1. 编译成功后替换 hive-common-xxx.jar, hive-exec-xxx.jar

替换前注意备份原lib下的包,方便出问题后回滚

1
2
cp /opt/apache-hive-3.1.3-src/ql/target/hive-exec-3.1.3.jar /opt/apache-hive-3.1.3-bin/lib/
cp /opt/apache-hive-3.1.3-src/common/target/hive-common-3.1.3.jar /opt/apache-hive-3.1.3-bin/lib/
  1. 包替换好后重启 Hive,再次查询 rt 表结果正常

img.png