最近咨询订阅配置的人比较多, 这里进行分析下.

配置信息

订阅配置信息是consumer向broker消费消息的凭证, 如果broker开启了 autoCreateSubscriptionGroup=false , 那么consumer client在消费之前, 必须通过命令行或者控制台上创建订阅配置, 然后consumer client使用配置订阅的名字. 通过命令行创建的订阅如下:

-> % mqadmin  updateSubGroup
usage: mqadmin updateSubGroup [-a <arg>] [-b <arg>] [-c <arg>] [-d <arg>] -g <arg> [-h] [-i <arg>] [-m <arg>]
       [-n <arg>] [-q <arg>] [-r <arg>] [-s <arg>] [-w <arg>]
 -a,--notifyConsumerIdsChanged <arg>       notify consumerId changed
 -b,--brokerAddr <arg>                     create subscription group to which broker
 -c,--clusterName <arg>                    create subscription group to which cluster
 -d,--consumeBroadcastEnable <arg>         broadcast
 -g,--groupName <arg>                      consumer group name
 -h,--help                                 Print help
 -i,--brokerId <arg>                       consumer from which broker id
 -m,--consumeFromMinEnable <arg>           from min offset
 -n,--namesrvAddr <arg>                    Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
 -q,--retryQueueNums <arg>                 retry queue nums
 -r,--retryMaxTimes <arg>                  retry max times
 -s,--consumeEnable <arg>                  consume enable
 -w,--whichBrokerWhenConsumeSlowly <arg>   which broker id when consume slowly

其中, -b-c 两个参数是对立的, -b 只会请求对应的broker, 而-c 则会先获取指定集群下的所有broker地址, 然后遍历执行创建SubscriptionGroupConfig. -m 目前是多余的配置, 暂时不起任何作用. 其他的配置中, 比较重要的如下:

  • notifyConsumerIdsChanged: 当有新的consumer连接到broker的时候, 是否允许broker遍历已经注册的consumer进行通知请求, cosumer接收到通知请求后, 会触发rebalance. 这个参数主要的作用是什么呢? 如果是有序消费, 并没有太大的影响, 只是添加的consumer需要在下一轮rebalance之后才能消费, 并且是 获取到broker的队列锁之后才能消费; 如果是并发消费, 关闭这个选项的话, 就会导致严重的重复消费. 因为和有序消费不同, 并发消费没有队列锁, 那么, 如果关闭选项的话, 每个consumer不能及时感知到其他consumer的存在, 每个consumer rebalance的实际不一样, 导致一段时间内, 有的consumer消费的是加入前分配的结果, 有的consumer消费的则是分配后的结果, consumer主动触发rebalance是 20s.

  • consumeBroadcastEnable: 这个只有在需要广播消息的时候才需要打开, 一般用不到

  • consumeEnable: 正常使用直接设置成 true 就可以了. 如果希望所有的consumer都不消费, 比如 敏感秘密级别的原因, 设置成 false

  • whichBrokerWhenConsumeSlowly: 这个只有在 consumer group lag 非常大的时候才会触发. 只有在 brokerConfig#isSlaveReadEnable 打开的情况下才会奏效. 当master lag非常大的时候, rocketmq 是有策略的: 重定向consumer 到 slave 消费, 众所周知, master-salve 同步配置中, 每个broker是有 brokerId 的, brokerId=0 是master, brokerId大于0 的是slave, 一般建议设置为1, 一些master-slave的配置中, 为了保证数据不丢, 配置了两个slave, 一般是 slaveId=1 和 slaveId=2, 所以, 因为 whichBrokerWhenConsumeSlowly 只能设置一个值, 因为 存在一个slave空闲的场景

存储

SubscriptionGroupConfig 是存储在 broker 上的, 并且以 json 格式存储在 storePath 路径下. 上面只讲述了创建的功能, 其实 mqadmin 还提供了删除了功能.

使用

  1. 心跳
  • consumer group定期心跳的时候, 心跳数据包含了group name 和 订阅的topic. 这里存在一个判断, 如果brokerConfig中 autoCreateSubscriptionGroup=true 的话, 即使 group 没有注册过订阅信息, 这里就会创建一个默认的 SubscriptionGroupConfig. 为后面拉取消息 提供凭证. 如果说 broker 关闭自动创建 SubscriptionGroupConfig: autoCreateSubscriptionGroup=false, 那么 没有注册过的group 无法消费消息

  • 在处理心跳的时候, 还会根据 SubscriptionGroupConfig#notifyConsumerIdsChanged 决定是否向已经注册的consumer client 发送 consumerIdChange 事件, 来触发 consumer client 的 rebalance

  1. 拉消息
  • 拉取消息之前, 必须有相应的 SubscriptionGroupConfig 信息, 如果没有的话, 就不能消费. 所以, 如果group没有注册过订阅信息, 那么 group 必须心跳成功后, 才有可能正常消费.

  • 拉取消息的时候, 如果lag很大, 在 brokerConfig#isSlaveReadEnable=true 的配置下, 则会根据 SubscriptionGroupConfig#whichBrokerWhenConsumeSlowly 的slaveId 来重定向consumer client 向指定的slave 拉取消息

  1. 监控
  • 比较特殊的情况, 线上经常使用group测试消费数据一段时间后, 就不在使用了. 但是 group 的lag监控却在一直增长, 并可能引起报警影响用户的生活. 如果用的是开源的rocketmq 监控, 即使使用 mqadmin#deleteSubGroup 也不能消除lag