最近一年中, 经常有用户不同的服务用一个group分别订阅不同的topic, 导致部分partition不消费

场景

业务反馈的时候, 通常是 监控上部分partition lag 增长, 并且queue的消费qps是0.

通过使用 mqadmin consumerProgress 查看offset 提交的时候, 发现这个group提交了多个topic, 并且每次结果不一样

-> % mqadmin consumerProgress -g  groupA  -n $addr  -s true
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Topic        #Broker Name                      #QID  #Broker Offset        #Consumer Offset      #Client IP           #Diff                 #LastTime
%RETRY%groupA  broker1                           0     0                     0                     ip1                  0                     N/A
topicA         broker1                           0     2180901               2180901               ip1                  0                     2020-03-08 20:10:04
topicA         broker1                           1     2000000               0                     ip1                  200000                2020-03-08 00:10:04
-> % mqadmin consumerProgress -g  groupA  -n $addr  -s true
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Topic        #Broker Name                      #QID  #Broker Offset        #Consumer Offset      #Client IP           #Diff                 #LastTime
%RETRY%groupA  broker1                           0     0                     0                     ip1                  0                     N/A
topicB         broker1                           0     2172997               2172997               ip1                  0                     2020-03-08 20:10:05
topicB         broker1                           1     1000000               0                     ip1                  10000                 2020-03-08 00:10:04

上面可以发现, groupA 消费了 topicA、topicB, 但是都只消费了一个queue, 导致了其他queue的lag. (为了说明问题, 这里很多使用了填充, 比如 ip、groupName、topic)

难道rocketmq不支持consumer client 同时订阅多个 topic?

其实正常使用rocketmq的情况下, 是允许一个consumer同时订阅多个topic的, 但是需要在consumer client启动前一次性订阅完多个topic, 比如下图, 一次性订阅TopicTestA 和 TopicTestB

rocketmq_sub_multi

但是如果业务分开来分别订阅的话, 就会存在问题, 如下使用

rocketmq_sub_a.png

rocketmq_sub_b.png

原因

为什么出现这样的原因?

首先, 为什么每次查询的结果不一样. 这个和broker维护consumer信息的实现 以及 心跳实现有关, 关键结构如下:

public class ConsumerManager {
    private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
        new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
    ......
}

public class ConsumerGroupInfo {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private final String groupName;
    private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
        new ConcurrentHashMap<String, SubscriptionData>()
    .......
}

重点是 SubscriptionData 的维护上, 可以发现, 本质上这是个映射的实现: group -> topic -> SubscriptionData. 之所以可以订阅多个topic, 是因为 ConsumerGroupInfo 内部维护了一个topic的map, 这样订阅多个topic的时候, 只需要将topic和订阅数据 存放在 ConcurrentMap 中, 这个行为是在心跳的机制中实现的

但是在问题场景中, topicA 和 topicB 是分两种心跳投递的, 第一种心跳是 groupA 订阅 topicA 的心跳, 假设是服务A启动的进程; 第二种心跳是 groupA 订阅 topicB 的心跳, 假设是 服务B启动的. 因为是两次心跳逻辑, 在rocketmq中心跳中订阅数据是基于覆盖方式实现的, 关键实现如 ConsumerGroupInfo#updateSubscription:

public class ConsumerGroupInfo {
    public boolean updateSubscription(final Set<SubscriptionData> subList) {
        boolean updated = false;

        for (SubscriptionData sub : subList) {
            SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
            if (old == null) {
            	SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
            	.....
            } else if (sub.getSubVersion() > old.getSubVersion()) {
                if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                    log.info("subscription changed, group: {} OLD: {} NEW: {}",
                        this.groupName,
                        old.toString(),
                        sub.toString()
                    );
                }

                this.subscriptionTable.put(sub.getTopic(), sub);
            }
        }
    	......
	}
}

这里补充下, sub#subVersion 就是当前时间. 也就是说, 随着多次心跳, topic的 SubscriptionData 会经常变化. 在我们执行命令 mqamdin consumerProgress的逻辑, cosnumer 的topic消费信息会根据 订阅信息进行过滤, 因为心跳的原因, 导致每次过滤出的topic 不一样, 也就会导致我们看到的结果不一样

那么, 为什么 消费存在lag呢? 这个就和 消费的rebalance 有关了.

在cluster模式的消费的流程中, 需要走以下几个逻辑(不是串行的, 这里为了简单说明):

  1. 遍历本地订阅的topic
  2. 获取这个topic下所有的mq
  3. 获取这个consumer group的所有consumerID
  4. 根据算法, 将topic下的所有mq分配给所有consumerID
  5. 当前consumer会尝试消费分配给自己的mq (如果是顺序消费, 会存在一次加锁行为, 这里不讨论)

这里需要注意的是, 无论是 服务A 还是 服务B 的消费, 在第三步中, 获取的 consumerID 都是 两个服务总共的 consumer, 这样在执行分配的时候, topicA 和 topicB 都是基于所有的consumer进行分配的. 可是, 服务A的consumer client 并不会去消费 topicB, 这样 topicB分配给 服务A consumer client 的 mq 并不会消费, 导致了lag.

如何快速定位

我们可以使用 mqadmin consumerProgress 查看这个consumer group 订阅topic的offset提交情况, 如果发现多个 topic 的提交情况, 并且并没有一次性订阅多个topic, 基本上是这种情况了