如何解决kafka Response Heartbeat The group is rebalancing, so a rejoin is needed导致重复调用eachMessage函数

1
2
[Nest] 19 - 06/16/2021, 1:09:12 PM [ClientKafka] ERROR [Connection] Response Heartbeat(key: 12, version: 3) {"timestamp":"2021-06-16T13:09:12.779Z","logger":"kafkajs","broker":"kafka-0.kafka-headless.dev.svc.cluster.local:9092","clientId":"reviews-ts-service-client","error":"The group is rebalancing, so a rejoin is needed","correlationId":1241,"size":10} +2857ms
[Nest] 19 - 06/16/2021, 1:09:12 PM [ClientKafka] ERROR [Runner] The group is rebalancing, re-joining {"timestamp":"2021-06-16T13:09:12.779Z","logger":"kafkajs","groupId":"reviews-consumer-ts-customer-client","memberId":"reviews-ts-service-client-453b2860-fdab-4c01-aa98-e015667b8d3b","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":330} +1m

其实这个报错非常恶心人,如果是入库等程序一般不会遇到,但是如果是请求网页的进程,很容易就遇到了,然后就会心跳超时,之后就会重启消费者,重新执行eachMessage函数,但是之前的eachMessage函数依然在执行,会导致eachMessage越来越多然后卡死。

1
2
https://github.com/nestjs/nest/issues/7270
https://github.com/tulios/kafkajs/issues/1097

这里是两篇遇到这个问题的文章,第一个解决的方法是
HOW TO RESOLVE THIS ISSUE?

1
2
3
sessionTimeout: 60000,
heartbeatInterval: 40000,
maxWaitTimeInMs: 43000,

sessionTimeout : it should be greater than the processing time of method.
heartbeatInterval : someone said, it should 2/3 of sessionTimeout
maxWaitTimeInMs : it must be _greater with heartbeatInterval

This issue was resolved by above configuration.

也就是增加会话时间。
其实也不需要说第二个文章了,直接说答案,就是增加会话时间和心跳间隔,将这两个增加一定程度,就会避免遇到这个问题了。

现在看日志是非常的舒服,不像以前那样一堆问题。

参考内容:

Options

1
2
3
4
5
6
7
8
9
10
11
12
13
14
kafka.consumer({
groupId: <String>,
partitionAssigners: <Array>,
sessionTimeout: <Number>,
rebalanceTimeout: <Number>,
heartbeatInterval: <Number>,
metadataMaxAge: <Number>,
allowAutoTopicCreation: <Boolean>,
maxBytesPerPartition: <Number>,
minBytes: <Number>,
maxBytes: <Number>,
maxWaitTimeInMs: <Number>,
retry: <Object>,
})
optiondescriptiondefault
partitionAssignersList of partition assigners[PartitionAssigners.roundRobin]
sessionTimeoutTimeout in milliseconds used to detect failures. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance30000
rebalanceTimeoutThe maximum time that the coordinator will wait for each member to rejoin when rebalancing the group60000
heartbeatIntervalThe expected time in milliseconds between heartbeats to the consumer coordinator. Heartbeats are used to ensure that the consumer’s session stays active. The value must be set lower than session timeout3000
metadataMaxAgeThe period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions300000 (5 minutes)
allowAutoTopicCreationAllow topic creation when querying metadata for non-existent topicstrue
maxBytesPerPartitionThe maximum amount of data per-partition the server will return. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition1048576 (1MB)
minBytesMinimum amount of data the server should return for a fetch request, otherwise wait up to maxWaitTimeInMs for more data to accumulate.1
maxBytesMaximum amount of bytes to accumulate in the response. Supported by Kafka >= 0.10.1.010485760 (10MB)
maxWaitTimeInMsThe maximum amount of time in milliseconds the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by minBytes5000
retrySee retry for more information{ retries: 5 }
readUncommittedConfigures the consumer isolation level. If false (default), the consumer will not return any transactional messages which were not committed.false
maxInFlightRequestsMax number of requests that may be in progress at any time. If falsey then no limit.null (no limit)
rackIdConfigure the “rack” in which the consumer resides to enable follower fetchingnull (fetch from the leader always)

自己用了一天感觉还是没啥卵用,于是又找了一个新的方法,就是把重试次数调低,默认是5次。

因为我们的程序是等待时间长,而不是停止了,所以在心跳间隔没有发送心跳,那么就会重启,但是不会将之前的给关闭。那么我们直接不让他重启就可以了,这个我觉得最好是应该在重新平衡的时候将之前的eachMessage关闭,这个样子就不会有这样的问题了。

如何解决kafka Response Heartbeat The group is rebalancing, so a rejoin is needed导致重复调用eachMessage函数

http://www.datehoer.com/posts/69c60b41-6e80-11ee-a697-01b0896cf41d/

作者

datehoer

发布于

2022-04-28

更新于

2023-10-19

许可协议

评论