周末上午, 突然接到业务方反馈 nsq-consumer 发生了 netty 内存泄露. 通过heap dump文件分析文件, 发下了大内存占用. 如下图

oom-ana overall 界面

oom-list object-list 界面

通过点击 “overall 界面” 图的list object功能, 会进入 “object-list 界面”, 可以发现 nsqj 的内存占用最高, 问题定位到了, 那么, nsqj 线程是做什么的呢? nsqj实例化如下:

public AbstractNSQClient(BootstrapConfig config, String topic, String channel, int rdy,
      int workerNumber) {
    connections = new Connections();
    executor = Executors.newFixedThreadPool(workerNumber,
        new ThreadFactoryBuilder().setNameFormat("nsqj-").setDaemon(true).build());
    ......
}

可以发现, executor 使用了 fixed线程池, 这里的队列在sdk中使用的是 LinkedBlockingQueue, 队列是无限增长的. 这样看, 稳定是定位了, 就是因为 executor的队列一直在增长, 但是, 是谁再往队列中投递数据的呢? 还是无限投递! review代码如下:

public class NSQHandler extends SimpleChannelInboundHandler<NSQFrame> {
  .....
  protected void channelRead0(ChannelHandlerContext ctx, NSQFrame msg) throws Exception {
    final Connection con = Channels.getConnection(ctx.channel());
    if (con != null) {
      con.getParent().getExecutor().execute(() -> con.incoming(msg));
    } else {
      if (!(msg instanceof ResponseFrame)) {
        throw new IllegalStateException(
            "no connection attachment for channel : " + ctx.channel().id());
      }
      logger.warn("unknow message: {}", msg);
    }
  }
  ......

通过查看引用关系, 发现是 nsq的consumer不断处理消息, 将收到的消息不断放入 executor 的队列. 但是nsq本身是有很好的 流控策略的, nsq 的 机制参考官方文档, 结合现在发现的现象, 我直接怀疑是不是 rdy 计算上出现了问题, 这里看下rdy的流控的实现, 如下:

public void incoming(NSQFrame frame) {
    ......
    if (frame instanceof MessageFrame) {
      MessageFrame msg = (MessageFrame) frame;
      long tot = this.totalMessages.incrementAndGet();
      if (tot % messagesPerBatch > (messagesPerBatch / 2)) {
        this.write(new RDYCommand(this.messagesPerBatch));
      }
    .........
}

在上面的代码可以发现, 每次消费点位在 (messagesPerBatch / 2, messagesPerBatch) 的时候, 每消费一个消息, 都会触发向 服务端请求 messagesPerBatch 数量的消息. 这样, 消息量会叠加的很多, 如果消息体 还很大的话, oom现象就会很明显. 询问了业务方, 消息体确实很大, 为了快速修复问题, 先修改了新的 rdy策略 上线, 问题消息.