2 minutes
Rocketmq_allocate
最近一年中, 经常有用户不同的服务用一个group分别订阅不同的topic, 导致部分partition不消费
场景
业务反馈的时候, 通常是 监控上部分partition lag 增长, 并且queue的消费qps是0.
通过使用 mqadmin consumerProgress
查看offset 提交的时候, 发现这个group提交了多个topic, 并且每次结果不一样
-> % mqadmin consumerProgress -g groupA -n $addr -s true
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Topic #Broker Name #QID #Broker Offset #Consumer Offset #Client IP #Diff #LastTime
%RETRY%groupA broker1 0 0 0 ip1 0 N/A
topicA broker1 0 2180901 2180901 ip1 0 2020-03-08 20:10:04
topicA broker1 1 2000000 0 ip1 200000 2020-03-08 00:10:04
-> % mqadmin consumerProgress -g groupA -n $addr -s true
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
#Topic #Broker Name #QID #Broker Offset #Consumer Offset #Client IP #Diff #LastTime
%RETRY%groupA broker1 0 0 0 ip1 0 N/A
topicB broker1 0 2172997 2172997 ip1 0 2020-03-08 20:10:05
topicB broker1 1 1000000 0 ip1 10000 2020-03-08 00:10:04
上面可以发现, groupA 消费了 topicA、topicB, 但是都只消费了一个queue, 导致了其他queue的lag. (为了说明问题, 这里很多使用了填充, 比如 ip、groupName、topic)
难道rocketmq不支持consumer client 同时订阅多个 topic?
其实正常使用rocketmq的情况下, 是允许一个consumer同时订阅多个topic的, 但是需要在consumer client启动前一次性订阅完多个topic, 比如下图, 一次性订阅TopicTestA 和 TopicTestB
但是如果业务分开来分别订阅的话, 就会存在问题, 如下使用
原因
为什么出现这样的原因?
首先, 为什么每次查询的结果不一样. 这个和broker维护consumer信息的实现 以及 心跳实现有关, 关键结构如下:
public class ConsumerManager {
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
......
}
public class ConsumerGroupInfo {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final String groupName;
private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable =
new ConcurrentHashMap<String, SubscriptionData>()
.......
}
重点是 SubscriptionData 的维护上, 可以发现, 本质上这是个映射的实现: group -> topic -> SubscriptionData. 之所以可以订阅多个topic, 是因为 ConsumerGroupInfo 内部维护了一个topic的map, 这样订阅多个topic的时候, 只需要将topic和订阅数据 存放在 ConcurrentMap 中, 这个行为是在心跳的机制中实现的
但是在问题场景中, topicA 和 topicB 是分两种心跳投递的, 第一种心跳是 groupA 订阅 topicA 的心跳, 假设是服务A启动的进程; 第二种心跳是 groupA 订阅 topicB 的心跳, 假设是 服务B启动的. 因为是两次心跳逻辑, 在rocketmq中心跳中订阅数据是基于覆盖方式实现的, 关键实现如 ConsumerGroupInfo#updateSubscription:
public class ConsumerGroupInfo {
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
for (SubscriptionData sub : subList) {
SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
if (old == null) {
SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
.....
} else if (sub.getSubVersion() > old.getSubVersion()) {
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}",
this.groupName,
old.toString(),
sub.toString()
);
}
this.subscriptionTable.put(sub.getTopic(), sub);
}
}
......
}
}
这里补充下, sub#subVersion 就是当前时间. 也就是说, 随着多次心跳, topic的 SubscriptionData 会经常变化. 在我们执行命令 mqamdin consumerProgress
的逻辑, cosnumer 的topic消费信息会根据 订阅信息进行过滤, 因为心跳的原因, 导致每次过滤出的topic 不一样, 也就会导致我们看到的结果不一样
那么, 为什么 消费存在lag呢? 这个就和 消费的rebalance 有关了.
在cluster模式的消费的流程中, 需要走以下几个逻辑(不是串行的, 这里为了简单说明):
- 遍历本地订阅的topic
- 获取这个topic下所有的mq
- 获取这个consumer group的所有consumerID
- 根据算法, 将topic下的所有mq分配给所有consumerID
- 当前consumer会尝试消费分配给自己的mq (如果是顺序消费, 会存在一次加锁行为, 这里不讨论)
这里需要注意的是, 无论是 服务A 还是 服务B 的消费, 在第三步中, 获取的 consumerID 都是 两个服务总共的 consumer, 这样在执行分配的时候, topicA 和 topicB 都是基于所有的consumer进行分配的. 可是, 服务A的consumer client 并不会去消费 topicB, 这样 topicB分配给 服务A consumer client 的 mq 并不会消费, 导致了lag.
如何快速定位
我们可以使用 mqadmin consumerProgress
查看这个consumer group 订阅topic的offset提交情况, 如果发现多个 topic 的提交情况, 并且并没有一次性订阅多个topic, 基本上是这种情况了