2. Consumer
一、Consumer
消费kafka里消息的角色叫Consumer(消费者)
二、ConsumerGroup
消费者组是kafka提供的可横向扩展的消费机制,与此相反的kafka也有standalone消费者(单个消费者)。一个消费者组有自己的group.id。
消费者组是是由一个或多个消费者端组成的。一个消费者组里的消费者共享一个topic,一个topic通常有一个或多个partition组成。在消费者组消费topic时:
- 消费者数量=partition数量,消费者与partition一一对应的消费
- 消费者数量<partition数量,个别消费者会消费多个partition
- 消费者数量>partition数量,个别消费者会没有partition可消费
在开发编写消费者程序时,只需要指定group.id和topic 即可
group.id: test
topic: topic-1
程序指定当前消费者组的名称为test,该消费者组去消费topic-1。当程序起了三个实例时,那么此时该消费者组里,有三个消费者,这三个消费者都会消费topic-1
三、消费者参数调优配置
- fetch.max.bytes: 单次获取数据大小的最大值
- fetch.min.bytes: 单次获取数据大小的最小值
- max.partition.fetch.bytes: 单次获取单个分区的数据大小的最大值,默认1M
- enable.auto.commit: 自动提交
- max.poll.records :单次拉取消息数,默认为500
- max.poll.interval.ms :两次poll方法调用的最大间隔时间
四、rebalance机制
rebalance机制是平衡消费者和分区之间的对应关系的,触发时机如下:
- 分区个数发生变化
- 消费的主题个数发生变化
- 消费者个数发生变化
4.1 rebalance流程
rebalance是由协调者(coordinator)协助consumer group完成rebalance的。
每一个broker都会有一个Coordinator,除了rebalance外,还会管理consumer group的offset
4.1.1 Consumer Group如何知道哪个协调者是为自己服务的
Consumer Group的位移保存在__consumer_offset这个内部主题的某个partition中
这个partition的id是hash(group_id)%50(__consumer_offset主题有50个分区)。根据这个partition,找到他的leader所在的broker,这个broker上的coordinator就是该Consumer Group的协调者。
4.1.2 协调者如何管理组成员
当Consumer启动的时候,会向协调者所在broker发送个Join Group的请求,收到请求后协调者执行消费者组的注册,消费者元数据信息保存
消费者组中的各个成员都会定时的向协调者发送心跳
当协调者发现某个成员长时间没有心跳,那么协调者就会认为这个消费者挂了,把这个消费者从消费者组中剔除出去,然后在其他Consumer的心跳请求中回复 reblance_need,让每个消费者停止消息消费,并开启Reblance。
当各个消费者收到reblance_need响应后,都会停止消息消费,并向协调者发送 SyncGroup请求,来询问分配给自己的分区信息
4.1.3 分区分配的策略
分配策略是指:当reblance时,重新将分区分配给消费者的方式。 常用的分区方式有三种:Range,RoundRobin 和 StickyAssignor
- Range: 按照分区号的范围进行分区分配
- RoundRobin: 轮训分配,将分区逐个分配给消费者
- StickyAssignor: 粘性分配,每次 Rebalance 时,该策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动
例如实例Consumer1之前负责消费分区 0、1、2,那么在StickyAssignor Rebalance时,Consumer1 继续消费分区0、1、2,而不是被重新分配其他的分区。这样的话,实例 Consumer1 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。
4.2 Reblance产生的影响
- 影响消费速度,reblance时消费者都会停止消费,且越是消费者多的时候,reblance导致的消费停顿时长越长
- 可能会重复消费,加入有的消息已经被处理了,但是还没有commit,这时reblance就会导致新的消费者重复消费此数据