首页

解决Kafka报出CommitFailedException异常“Commit cannot be..reducing the maximum size of batches returned in poll() with max.poll.records.”问题

标签:kafka,CommitFailedException,性能优化,partitions,session.timeout,max.poll.records,ERROR,KafkaConsumer,KafkaUtils     发布时间:2017-05-19   

一、异常日志

最近发现推送消息至kafka队列后,无法有效接受到消息查看日志报出“org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records...[ERROR] [] ConsumerCoordinator:  User provided listener com.xwood.woopa.kafka.util.KafkaUtils$1 for group topic-test-k1 failed on partition revocation”异常,如下所示

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.@b@        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:426) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1059) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1027) ~[org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at com.xwood.woopa.kafka.consumer.AbstractMessageConsumer.commitSync(AbstractMessageConsumer.java:103) ~[com.xwood.woopa5%23pafa5-support-kafka%231.0.7.jar:1.0.7]@b@        at com.xwood.woopa.kafka.util.KafkaUtils$1.onPartitionsRevoked(KafkaUtils.java:105) ~[com.xwood.woopa5%23pafa5-support-kafka%231.0.7.jar:1.0.7]@b@        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:295) [org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212) [org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366) [org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978) [org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) [org.apache.kafka%23kafka-clients%230.10.0.1.jar:?]@b@        at com.xwood.woopa.kafka.consumer.AbstractMessageConsumer.poll(AbstractMessageConsumer.java:67) [com.xwood.woopa5%23pafa5-support-kafka%231.0.7.jar:1.0.7]@b@        at com.xwood.woopa.kafka.consumer.CommitImmediatelyConsumer.poll(CommitImmediatelyConsumer.java:35) [com.xwood.woopa5%23pafa5-support-kafka%231.0.7.jar:1.0.7]@b@        at com.xwood.woopa.kafka.consumer.CommitImmediatelyConsumer.poll(CommitImmediatelyConsumer.java:31) [com.xwood.woopa5%23pafa5-support-kafka%231.0.7.jar:1.0.7]@b@        at com.xwood.woopa.kafka.core.SingleThreadMessageListenerContainer$Worker.run(SingleThreadMessageListenerContainer.java:119) [com.xwood.woopa5%23pafa5-support-kafka%231.0.7.jar:1.0.7]@b@[14:09:04.049] [ERROR] [] ConsumerCoordinator:  User provided listener com.xwood.woopa.kafka.util.KafkaUtils$1 for group topic-test-k1 failed on partition revocation

二、解决方法

在对配置项(实例化关联加载配置文件)中增加session.timeout.ms设置为1分钟(如下所示),默认为5秒

...@b@session.timeout.ms=60000@b@...

问题分析 - kafka通过客户端(kafka-clients-0.10.0.1.jar)连接kafka服务器的回话session的超时时间比kafka服务器rebalanced负载分区对于服务延时时间要短(kafka服务端增加多个备用服务器,数据太多的时候备机同步的延时越久,需要优化服务端网络及max.poll.records相关参数),源码分析截图如下:

1)找到报错kafka-clients-0.10.0.1.jar中org.apache.kafka.clients.consumer.internals.ConsumerCoordinator类在

解决Kafka报出CommitFailedException异常“Commit cannot be..reducing the maximum size of batches returned in poll() with max.poll.records.”问题

2)找到报错kafka-clients-0.10.0.1.jar中org.apache.kafka.clients.NetworkClient想服务器的partition分区ClientResponse请求的时候,服务端返回“error_code”的错误内容(解析1)中具体错误明细信息)

解决Kafka报出CommitFailedException异常“Commit cannot be..reducing the maximum size of batches returned in poll() with max.poll.records.”问题

解决Kafka报出CommitFailedException异常“Commit cannot be..reducing the maximum size of batches returned in poll() with max.poll.records.”问题