One minute
Rocketmq_flow_control
背景
rocketmq推广过程中, 偶尔会遇到 [TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: 201ms, size of queue: 5389
类似的报错, 导致上游业务失败率报警以及错误日志飙升. 在相应的监控上, rocketmq 的发送qps也是非常高.
原因
其实这个行为是 rocektmq broker 的自我保护机制, 那么什么时候会触发呢? 这个主要是在 store 进行put 消息的时候会触发. 之前讲过, 在 rocketmq 的处理机制中, netty 将读取到的消息 会封装成 RequestTask 对象提交到 executorService 的队列中, 然后等待 executorService 调度执行. 那么, 这里存在两种情况:
-
queue已经被写满了, 无法再提交新的任务, 那么会触发
RejectedExecutionException
, 这个时候, rocketmq broker 会返回RemotingSysResponseCode.SYSTEM_BUSY
, 提示信息是:[OVERLOAD]
. 参考:NettyRemotingAbstract#processRequestCommand
-
调度延迟的问题. 我在 11:05 提交了一个写入请求, 但是因为 写入流程耗时 增加, 导致我的请求到 11:06 才被处理, 对于实时在线业务而言, 这条消息其实早就超时了, 这种情况, rocketmq 有两套机制:
-
2.1 定期检查queue, 会检查 RequestTask 的生成时间 和 当前时间的 差值, 如果超过了配置的超时时间, 就会返回
RemotingSysResponseCode.SYSTEM_BUSY
, 提示TIMEOUT_CLEAN_QUEUE
, 参考 BrokerFastFailure#cleanExpiredRequestInQueue -
2.2 对于 send 请求, 会检查当前系统是否
isOSPageCacheBusy
, 如果 true, 就会拿取 queue 的第一个 RequestTask 返回RemotingSysResponseCode.SYSTEM_BUSY
, 提示PCBUSY_CLEAN_QUEUE
, 直到 判断是 false. 那么, isOSPageCacheBusy 的判断逻辑是什么呢? 如下:
-
@Override
public boolean isOSPageCacheBusy() {
long begin = this.getCommitLog().getBeginTimeInLock();
long diff = this.systemClock.now() - begin;
return diff < 10000000
&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();
}
getOsPageCacheBusyTimeOutMills默认是1000ms, 可以发现, 这里diff 需要在 (1000, 10_000_000) 的区间, 也就是commitlog 最近的一条写入大于1s的情况 (beginTimeInLock 在写入的时候会被设置当前时间戳, 但是在写入成功后会被重置为0), diff 的 上界为了区别于 没有写入的情况(没有写入, beginTimeInLock就是0, diff=this.systemClock.now() ).
-
写入逻辑之前会进行一次
isOSPageCacheBusy
的检查, true 则设置 PutMessageResult 状态OS_PAGECACHE_BUSY
, 返回ResponseCode.SYSTEM_ERROR
, 提示[PC_SYNCHRONIZED]
-
为了避免 pageCache busy 场景下请求的无效投递到queue, 在提交queue之前, 会检查条件
rejectRequest
, 判断条件是this.brokerController.getMessageStore().isOSPageCacheBusy() ||this.brokerController.getMessageStore().isTransientStorePoolDeficient()
, 其中,isTransientStorePoolDeficient
已经被弃用, isOSPageCacheBusy 参见上文.
综合上面的分析, 因为我们业务的qps很高, 导致了 TIMEOUT_CLEAN_QUEUE
的提示, 是因为 处理写入时间 过长
解决
扩容, 降低broker的热点负载