2 minutes
Rocketmq_subsconfig
最近咨询订阅配置的人比较多, 这里进行分析下.
配置信息
订阅配置信息是consumer向broker消费消息的凭证, 如果broker开启了 autoCreateSubscriptionGroup=false
, 那么consumer client在消费之前, 必须通过命令行或者控制台上创建订阅配置, 然后consumer client使用配置订阅的名字. 通过命令行创建的订阅如下:
-> % mqadmin updateSubGroup
usage: mqadmin updateSubGroup [-a <arg>] [-b <arg>] [-c <arg>] [-d <arg>] -g <arg> [-h] [-i <arg>] [-m <arg>]
[-n <arg>] [-q <arg>] [-r <arg>] [-s <arg>] [-w <arg>]
-a,--notifyConsumerIdsChanged <arg> notify consumerId changed
-b,--brokerAddr <arg> create subscription group to which broker
-c,--clusterName <arg> create subscription group to which cluster
-d,--consumeBroadcastEnable <arg> broadcast
-g,--groupName <arg> consumer group name
-h,--help Print help
-i,--brokerId <arg> consumer from which broker id
-m,--consumeFromMinEnable <arg> from min offset
-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-q,--retryQueueNums <arg> retry queue nums
-r,--retryMaxTimes <arg> retry max times
-s,--consumeEnable <arg> consume enable
-w,--whichBrokerWhenConsumeSlowly <arg> which broker id when consume slowly
其中, -b
和 -c
两个参数是对立的, -b
只会请求对应的broker, 而-c
则会先获取指定集群下的所有broker地址, 然后遍历执行创建SubscriptionGroupConfig
. -m
目前是多余的配置, 暂时不起任何作用. 其他的配置中, 比较重要的如下:
-
notifyConsumerIdsChanged: 当有新的consumer连接到broker的时候, 是否允许broker遍历已经注册的consumer进行通知请求, cosumer接收到通知请求后, 会触发rebalance. 这个参数主要的作用是什么呢? 如果是有序消费, 并没有太大的影响, 只是添加的consumer需要在下一轮rebalance之后才能消费, 并且是 获取到broker的队列锁之后才能消费; 如果是并发消费, 关闭这个选项的话, 就会导致严重的重复消费. 因为和有序消费不同, 并发消费没有队列锁, 那么, 如果关闭选项的话, 每个consumer不能及时感知到其他consumer的存在, 每个consumer rebalance的实际不一样, 导致一段时间内, 有的consumer消费的是加入前分配的结果, 有的consumer消费的则是分配后的结果, consumer主动触发rebalance是 20s.
-
consumeBroadcastEnable: 这个只有在需要广播消息的时候才需要打开, 一般用不到
-
consumeEnable: 正常使用直接设置成 true 就可以了. 如果希望所有的consumer都不消费, 比如 敏感秘密级别的原因, 设置成 false
-
whichBrokerWhenConsumeSlowly: 这个只有在 consumer group lag 非常大的时候才会触发. 只有在 brokerConfig#isSlaveReadEnable 打开的情况下才会奏效. 当master lag非常大的时候, rocketmq 是有策略的: 重定向consumer 到 slave 消费, 众所周知, master-salve 同步配置中, 每个broker是有 brokerId 的, brokerId=0 是master, brokerId大于0 的是slave, 一般建议设置为1, 一些master-slave的配置中, 为了保证数据不丢, 配置了两个slave, 一般是 slaveId=1 和 slaveId=2, 所以, 因为 whichBrokerWhenConsumeSlowly 只能设置一个值, 因为 存在一个slave空闲的场景
存储
SubscriptionGroupConfig 是存储在 broker 上的, 并且以 json 格式存储在 storePath 路径下. 上面只讲述了创建的功能, 其实 mqadmin 还提供了删除了功能.
使用
- 心跳
-
consumer group定期心跳的时候, 心跳数据包含了group name 和 订阅的topic. 这里存在一个判断, 如果brokerConfig中
autoCreateSubscriptionGroup=true
的话, 即使 group 没有注册过订阅信息, 这里就会创建一个默认的 SubscriptionGroupConfig. 为后面拉取消息 提供凭证. 如果说 broker 关闭自动创建 SubscriptionGroupConfig:autoCreateSubscriptionGroup=false
, 那么 没有注册过的group 无法消费消息 -
在处理心跳的时候, 还会根据
SubscriptionGroupConfig#notifyConsumerIdsChanged
决定是否向已经注册的consumer client 发送consumerIdChange
事件, 来触发 consumer client 的 rebalance
- 拉消息
-
拉取消息之前, 必须有相应的 SubscriptionGroupConfig 信息, 如果没有的话, 就不能消费. 所以, 如果group没有注册过订阅信息, 那么 group 必须心跳成功后, 才有可能正常消费.
-
拉取消息的时候, 如果lag很大, 在
brokerConfig#isSlaveReadEnable=true
的配置下, 则会根据SubscriptionGroupConfig#whichBrokerWhenConsumeSlowly
的slaveId 来重定向consumer client 向指定的slave 拉取消息
- 监控
- 比较特殊的情况, 线上经常使用group测试消费数据一段时间后, 就不在使用了. 但是 group 的lag监控却在一直增长, 并可能引起报警影响用户的生活. 如果用的是开源的rocketmq 监控, 即使使用
mqadmin#deleteSubGroup
也不能消除lag