背景

rocketmq推广过程中, 偶尔会遇到 [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 201ms, size of queue: 5389 类似的报错, 导致上游业务失败率报警以及错误日志飙升. 在相应的监控上, rocketmq 的发送qps也是非常高.

原因

其实这个行为是 rocektmq broker 的自我保护机制, 那么什么时候会触发呢? 这个主要是在 store 进行put 消息的时候会触发. 之前讲过, 在 rocketmq 的处理机制中, netty 将读取到的消息 会封装成 RequestTask 对象提交到 executorService 的队列中, 然后等待 executorService 调度执行. 那么, 这里存在两种情况:

  1. queue已经被写满了, 无法再提交新的任务, 那么会触发 RejectedExecutionException, 这个时候, rocketmq broker 会返回 RemotingSysResponseCode.SYSTEM_BUSY, 提示信息是: [OVERLOAD]. 参考: NettyRemotingAbstract#processRequestCommand

  2. 调度延迟的问题. 我在 11:05 提交了一个写入请求, 但是因为 写入流程耗时 增加, 导致我的请求到 11:06 才被处理, 对于实时在线业务而言, 这条消息其实早就超时了, 这种情况, rocketmq 有两套机制:

    • 2.1 定期检查queue, 会检查 RequestTask 的生成时间 和 当前时间的 差值, 如果超过了配置的超时时间, 就会返回 RemotingSysResponseCode.SYSTEM_BUSY, 提示 TIMEOUT_CLEAN_QUEUE, 参考 BrokerFastFailure#cleanExpiredRequestInQueue

    • 2.2 对于 send 请求, 会检查当前系统是否 isOSPageCacheBusy, 如果 true, 就会拿取 queue 的第一个 RequestTask 返回 RemotingSysResponseCode.SYSTEM_BUSY, 提示 PCBUSY_CLEAN_QUEUE, 直到 判断是 false. 那么, isOSPageCacheBusy 的判断逻辑是什么呢? 如下:

@Override
public boolean isOSPageCacheBusy() {
    long begin = this.getCommitLog().getBeginTimeInLock();
    long diff = this.systemClock.now() - begin;

    return diff < 10000000
        && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}

getOsPageCacheBusyTimeOutMills默认是1000ms, 可以发现, 这里diff 需要在 (1000, 10_000_000) 的区间, 也就是commitlog 最近的一条写入大于1s的情况 (beginTimeInLock 在写入的时候会被设置当前时间戳, 但是在写入成功后会被重置为0), diff 的 上界为了区别于 没有写入的情况(没有写入, beginTimeInLock就是0, diff=this.systemClock.now() ).

  1. 写入逻辑之前会进行一次 isOSPageCacheBusy的检查, true 则设置 PutMessageResult 状态 OS_PAGECACHE_BUSY, 返回 ResponseCode.SYSTEM_ERROR, 提示 [PC_SYNCHRONIZED]

  2. 为了避免 pageCache busy 场景下请求的无效投递到queue, 在提交queue之前, 会检查条件 rejectRequest, 判断条件是 this.brokerController.getMessageStore().isOSPageCacheBusy() ||this.brokerController.getMessageStore().isTransientStorePoolDeficient(), 其中, isTransientStorePoolDeficient 已经被弃用, isOSPageCacheBusy 参见上文.

综合上面的分析, 因为我们业务的qps很高, 导致了 TIMEOUT_CLEAN_QUEUE 的提示, 是因为 处理写入时间 过长

解决

扩容, 降低broker的热点负载