首页

【Kafka学习笔记】KafKa消费者之offset的偏移量

标签:kafka,offset     发布时间:2022-12-05   

阅读有关Kafka的《Kafka学习笔记》之“6. Kafka消费者 之 offset的维护”、”2. Consumer消费数据流程”章节,通过offset偏移量便故障(现断电宕机等故障)恢复后继续消费

1)索引文件+数据文件(offset偏移量关联)  

【Kafka学习笔记】KafKa消费者之offset的偏移量

【Kafka学习笔记】KafKa消费者之offset的偏移量

注:kafka会保存每个topic数据消费的记录offset,以便记录consumer消费到哪个数据了

2)Consumer消费成功后,再提交offset偏移量并持久化  -  同步提交和异步提交区分

【Kafka学习笔记】KafKa消费者之offset的偏移量

2.1)同步提交代码示例

// 同步提交: consumer提交完毕offset之后,才会继续消费数据。 @b@//3. 消费数据 while (true){ //JDK1.8 的API 毫秒数,@b@ ConsumerRecords<String, String> crs = kafkaConsumer.poll(Duration.ofMillis(100));@b@ for (ConsumerRecord<String, String> cr : crs) { @b@      System.out.println("cr = " + cr); @b@ } @b@ kafkaConsumer.commitAsync();@b@}

2.2)异步提交代码示例

// 异步提交: consumer只需要发出提交offset的指令之后,就可以继续消费数据,不需要等待本地offset 是否提交成功。 @b@while (true){ @b@  //JDK1.8 的API 毫秒数,@b@ ConsumerRecords<String, String> crs = kafkaConsumer.poll(Duration.ofMillis(100)); @b@ for (ConsumerRecord<String, String> cr : crs) { @b@   System.out.println("cr = " + cr);@b@ } @b@ kafkaConsumer.commitSync(); @b@}