2. Consumer

  • java
  • java
  • kafka

一、Consumer

消费kafka里消息的角色叫Consumer(消费者)

二、ConsumerGroup

消费者组是kafka提供的可横向扩展的消费机制,与此相反的kafka也有standalone消费者(单个消费者)。一个消费者组有自己的group.idopen in new window

消费者组是是由一个或多个消费者端组成的。一个消费者组里的消费者共享一个topic,一个topic通常有一个或多个partition组成。在消费者组消费topic时:

  • 消费者数量=partition数量,消费者与partition一一对应的消费
  • 消费者数量<partition数量,个别消费者会消费多个partition
  • 消费者数量>partition数量,个别消费者会没有partition可消费

在开发编写消费者程序时,只需要指定group.id和topic 即可

group.id: test
topic: topic-1

程序指定当前消费者组的名称为test,该消费者组去消费topic-1。当程序起了三个实例时,那么此时该消费者组里,有三个消费者,这三个消费者都会消费topic-1

三、消费者参数调优配置

  • fetch.max.bytes: 单次获取数据大小的最大值
  • fetch.min.bytes: 单次获取数据大小的最小值
  • max.partition.fetch.bytes: 单次获取单个分区的数据大小的最大值,默认1M
  • enable.auto.commit: 自动提交
  • max.poll.records :单次拉取消息数,默认为500
  • max.poll.interval.msopen in new window :两次poll方法调用的最大间隔时间

四、rebalance机制

rebalance机制是平衡消费者和分区之间的对应关系的,触发时机如下:

  • 分区个数发生变化
  • 消费的主题个数发生变化
  • 消费者个数发生变化

4.1 rebalance流程

rebalance是由协调者(coordinator)协助consumer group完成rebalance的。

每一个broker都会有一个Coordinator,除了rebalance外,还会管理consumer group的offset

4.1.1 Consumer Group如何知道哪个协调者是为自己服务的

Consumer Group的位移保存在__consumer_offset这个内部主题的某个partition中

这个partition的id是hash(group_id)%50(__consumer_offset主题有50个分区)。根据这个partition,找到他的leader所在的broker,这个broker上的coordinator就是该Consumer Group的协调者。

4.1.2 协调者如何管理组成员

当Consumer启动的时候,会向协调者所在broker发送个Join Group的请求,收到请求后协调者执行消费者组的注册,消费者元数据信息保存

消费者组中的各个成员都会定时的向协调者发送心跳

当协调者发现某个成员长时间没有心跳,那么协调者就会认为这个消费者挂了,把这个消费者从消费者组中剔除出去,然后在其他Consumer的心跳请求中回复 reblance_need,让每个消费者停止消息消费,并开启Reblance。

当各个消费者收到reblance_need响应后,都会停止消息消费,并向协调者发送 SyncGroup请求,来询问分配给自己的分区信息

4.1.3 分区分配的策略

分配策略是指:当reblance时,重新将分区分配给消费者的方式。 常用的分区方式有三种:Range,RoundRobin 和 StickyAssignor

  • Range: 按照分区号的范围进行分区分配
  • RoundRobin: 轮训分配,将分区逐个分配给消费者
  • StickyAssignor: 粘性分配,每次 Rebalance 时,该策略会尽可能地保留之前的分配方案,尽量实现分区分配的最小变动

例如实例Consumer1之前负责消费分区 0、1、2,那么在StickyAssignor Rebalance时,Consumer1 继续消费分区0、1、2,而不是被重新分配其他的分区。这样的话,实例 Consumer1 连接这些分区所在 Broker 的 TCP 连接就可以继续用,不用重新创建连接其他 Broker 的 Socket 资源。

4.2 Reblance产生的影响

  • 影响消费速度,reblance时消费者都会停止消费,且越是消费者多的时候,reblance导致的消费停顿时长越长
  • 可能会重复消费,加入有的消息已经被处理了,但是还没有commit,这时reblance就会导致新的消费者重复消费此数据
Loading...