700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > flink 消费 kafka offset 自动提交

flink 消费 kafka offset 自动提交

时间:2022-05-26 01:04:53

相关推荐

flink 消费 kafka offset 自动提交

flink 消费kafka 程序重启后,从原先的自动提交的点继续消费,earliest 不用再从开始消费

如果开启了checkpoint 以 checkpoint为准 ,mit 失效,

如果没有开启,则以mit 为准

flink1.14.0

Kafka | Apache Flink

flink dataStream 方式

KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers("broker").setTopics("topic").setGroupId("group_id").setProperty("mit","true")// .setProperty("auto.offset.reset","earliest").setProperty("auto.offset.reset","latest").setProperty("mit.interval.ms","1000")// .setStartingOffsets(OffsetsInitializer.earliest()).setStartingOffsets(mittedOffsets(OffsetResetStrategy.EARLIEST)).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> kafka_source = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");kafka_source.print();

flink sql 方式

重点是这四个属性

'scan.startup.mode'='group-offsets', // 默认

'properties.mit' = 'true',

'properties.auto.offset.reset.strategy' = 'earliest',

'mit.interval.ms' = '1000'

CREATE TABLE `kafka_order_stream` (`id` VARCHAR ,`name` VARCHAR) WITH ('format' = 'json','json.ignore-parse-errors' = 'true','json.fail-on-missing-field' = 'false','properties.bootstrap.servers' = 'broker','connector' = 'kafka','topic' = 'topic','scan.startup.mode'='group-offsets', // 默认'properties.group.id' = 'group_id','properties.mit' = 'true','properties.auto.offset.reset.strategy' = 'earliest','mit.interval.ms' = '1000');

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。