- Apache Hudi 是什么?
Apache Hudi 是 Apache 基金会下的开源项目,它提供了一个数据湖解决方案,支持增量式的数据处理和可变的数据表现形式。Hudi 最初由 Ubiquiti 区块链团队在 2016 年开发,2019 年捐赠给 Apache 软件基金会。Hudi 的核心特性是 Delta Lake 和 Apache Kafka 支持。Hudi 核心实现了以下两个模块:
- 将数据落地成可变的列式存储格式。
- 为了实现对数据的实时处理和可对数据的快速更新,Hudi 提供了一个更高层面的 API。
- Hudi 结合 Flink 的优势
Hudi 结合 Flink 可以使大规模数据处理任务更加高效。Hudi 可以将读写操作转变为成函数来让 Flink 对数据进行操作,从而将处理任务分散到多个节点上执行。此外,Hudi 可以缓存数据集以减少磁盘操作,同时通过支持可变数据和增量式处理来优化数据操作和数据的表现形式。
- Hudi 结合 Flink 的亿级数据入湖实践解析
Hudi 结合 Flink 的数据入湖实践步骤分为以下三个部分:
第一步:创建 Hudi 表并载入数据
可以使用以下命令创建 Hudi 表:
val config = HoodieConfig.newBuilder().withPath("").build()
val data = env.readTextFile("").map(record => (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()), record))
val hudi = new HoodieFlinkWriteStreamFunction(
config,
data,
(record: (String, String)) => new AvroTranscoder().toGenericRecord(record._2.getBytes()),
//新纪录以何种方式写入,是INSERT还是UPSERT
(record: (String, String)) => record._1,
(record: (String, String)) => UUID.randomUUID().toString,
"test_hudiDb",
"test_hudi"),
上述代码创建了一个名为test_hudi的数据表,并将数据写入该表。其中,withPath("") 需要填写数据存储在哪个位置;data 表示要写入的数据集合,这里使用的是 Flink API 的 readTextFile 方法读取文本文件;(record: (String, String)) => new AvroTranscoder().toGenericRecord(record._2.getBytes()) 表示将读取的文本转换成 Avro 格式,以适配 Hudi;(record: (String, String)) => record._1 表示使用第一列的数据作为主键;(record: (String, String)) => UUID.randomUUID().toString 表示为新增的记录生成 UUID;"test_hudiDb" 和 "test_hudi" 表示 Hudi 数据库名称和数据表名称。
第二步:实现增删改查等 SQL 操作
除了基本的增删改查,Hudi 还支持 MERGE 和 UPSERT INTO 操作。以下是 UPSERT INTO 操作的一个示例:
val schema = new AvroTranscoder().readSchema("test.avsc")
env.fromCollection(Seq((1, "a", "1/1/2021"), (2, "b", "1/2/2021"), (3, "c", "1/3/2021")))
.map(record => new org.apache.avro.generic.GenericData.Record(schema).put("id",record._1.toString).put("name", record._2).put("dob", record._3))
.map(record => (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()), record))
.addSink(new HoodieFlinkWriteBatchFunction(
config,
"test_hudiDb",
"test_hudi")
.withSchema(schema.toString())
//如果指定的行已经存在则进行更新操作
.withOperation(Configuracion.OPERATION_UPSERT)
//如下两个函数是必需的,在使用Hive进行元数据管理时会使用到
.withPreCombineField("")
.withRecordPrefix(""))
上述代码实现了 UPSERT INTO 操作,将数据写入之前创建的 Hudi 表 test_hudi 中。
第三步:加入 Flink SQL
Hudi 还可以通过 Flink SQL 进行操作,以下是一个 Flink SQL 操作的示例:
CREATE TABLE hudi_table (
id STRING,
name STRING,
dob STRING,
_hoodie_commit_time STRING, -- 写入该记录的提交时间
_hoodie_commit_seqno STRING, -- 写入该记录的序列号
_hoodie_record_key STRING, -- 主键列
_hoodie_partition_path STRING) -- 分区字段
PARTITIONED BY (_hoodie_partition_path) -- 指定分区字段
TBLPROPERTIES (
'hoodie.datasource.write.precombine.field' = 'id', -- 更新时合并的主键字段
'hoodie.datasource.write.recordkey.field' = 'id', -- 主键字段名
'hoodie.datasource.hive_sync.enable' = 'false') -- 是否同步到 Hive
上面的代码定义了一个 Hudi 表(hudi_table),然后指定了如何从 Hudi 表中读取数据以及数据之间的关系。
- 总结
在本文中,我们介绍了 Apache Hudi 是什么,以及它如何和 Flink 结合使用来进行亿级数据入湖实践。我们还给出了两个示例,分别是如何创建 Hudi 表并载入数据,以及如何实现增删改查等操作和如何加入 Flink SQL。通过这些案例,可以深入了解 Hudi 和 Flink 的协作机制,并学习如何利用它们在大规模数据处理场景中快速处理数据。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Apache Hudi结合Flink的亿级数据入湖实践解析 - Python技术站