700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > demo flink写入kafka_Flink结合Kafka实时写入Iceberg实践笔记

demo flink写入kafka_Flink结合Kafka实时写入Iceberg实践笔记

时间:2022-12-05 06:30:57

相关推荐

demo flink写入kafka_Flink结合Kafka实时写入Iceberg实践笔记

前言

上文提到使用Flink SQL写入hadoop catalog 的iceberg table 的简单示例,这次我就flink 消费kafka 流式写入iceberg table做一个验证,现记录如下:

环境:本地测试环境 JDK1.8 、Flink 1.11.2 、Hadoop3.0.0 、Hive2.1.1

一、前置说明

本文记录了使用HDFS的一个路径作为iceberg 的结果表,使用Flink实时消费kafka中的数据并写入iceberg表,并且使用Hive作为客户端实时读取。

因为iceberg强大的读写分离特性,新写入的数据几乎可以实时读取。参考数据湖技术Iceberg的探索与实践.pdf

二、使用步骤

1.创建HadoopCatalog的Iceberg 表

代码如下(示例):

System.out.println("---> 1. create iceberg hadoop catalog table .... ");

// create hadoop catalog

tenv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n"

" 'type'='iceberg',\n"

" 'catalog-type'='hadoop',\n"

" 'warehouse'='hdfs://nameservice1/tmp',\n"

" 'property-version'='1'\n"

")");

// change catalog

tenv.useCatalog("hadoop_catalog");

tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");

tenv.useDatabase("iceberg_hadoop_db");

// create iceberg result table

tenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_002");

tenv.executeSql("CREATE TABLE hadoop_catalog.iceberg_hadoop_db.iceberg_002 (\n"

" user_id STRING COMMENT 'user_id',\n"

" order_amount DOUBLE COMMENT 'order_amount',\n"

" log_ts STRING\n"

")");

2.使用Hive Catalog创建Kafka流表

代码如下(示例):

System.out.println("---> 2. create kafka Stream table .... ");

String HIVE_CATALOG = "myhive";

String DEFAULT_DATABASE = "tmp";

String HIVE_CONF_DIR = "/xx/resources";

Catalog catalog = new HiveCatalog(HIVE_CATALOG, DEFAULT_DATABASE, HIVE_CONF_DIR);

tenv.registerCatalog(HIVE_CATALOG, catalog);

tenv.useCatalog("myhive");

// create kafka stream table

tenv.executeSql("DROP TABLE IF EXISTS ods_k_2_iceberg");

tenv.executeSql(

"CREATE TABLE ods_k_2_iceberg (\n"

" user_id STRING,\n"

" order_amount DOUBLE,\n"

" log_ts TIMESTAMP(3),\n"

" WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND\n"

") WITH (\n"

" 'connector'='kafka',\n"

" 'topic'='t_kafka_03',\n"

" 'scan.startup.mode'='latest-offset',\n"

" 'properties.bootstrap.servers'='xx:9092',\n"

" 'properties.group.id' = 'testGroup_01',\n"

" 'format'='json'\n"

")");

3. 使用SQL连接kafka流表和iceberg 目标表

代码如下(示例):

System.out.println("---> 3. insert into iceberg table from kafka stream table .... ");

tenv.executeSql(

"INSERT INTO hadoop_catalog.iceberg_hadoop_db.iceberg_002 "

" SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd') FROM myhive.tmp.ods_k_2_iceberg");

4. 数据验证

bin/kafka-console-producer.sh --broker-list xx:9092 --topic t_kafka_03

{"user_id":"a1111","order_amount":11.0,"log_ts":"-06-29 12:12:12"}

{"user_id":"a1111","order_amount":11.0,"log_ts":"-06-29 12:15:00"}

{"user_id":"a1111","order_amount":11.0,"log_ts":"-06-29 12:20:00"}

{"user_id":"a1111","order_amount":11.0,"log_ts":"-06-29 12:30:00"}

{"user_id":"a1111","order_amount":13.0,"log_ts":"-06-29 12:32:00"}

{"user_id":"a1112","order_amount":15.0,"log_ts":"-11-26 12:12:12"}

hive> add jar /home/zmbigdata/iceberg-hive-runtime-0.10.0.jar;

hive> CREATE EXTERNAL TABLE tmp.iceberg_002(user_id STRING,order_amount DOUBLE,log_ts STRING)

STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'

LOCATION '/tmp/iceberg_hadoop_db/iceberg_002';

hive> select * from tmp.iceberg_002 limit 5;

a111111.0-06-29

a111111.0-06-29

a111111.0-06-29

a111111.0-06-29

a111113.0-06-29

Time taken: 0.108 seconds, Fetched: 5 row(s)

总结

本文仅仅简单介绍了使用Flink Table API 消费kafka并实时写入基于HDFS Hadoop Catalog的iceberg 结果表中,初步验证了该方案的可行性,当然鉴于该示例比较单一未经过线上验证,所以仅供参考。

不足之处烦请斧正,如对你有些许的帮助,还请不吝点赞支持Thanks♪(・ω・)ノ

来源:/content-4-827451.html

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。