Preface

最近使用sarama(kafka go client) 发现大量的报错: write: broken pipe, 并且还触发了我们的日志报警, 感到奇怪, 研究了一下

报错类型

除了 broken pipe, 还有 reset by peerEOF 两种报错. 根据查阅资料, 最终整理如下:

  • Broken pipe: 是 第二次向 closed tcp pipe(收到了rst报文) 写入数据导致的报错
  • reset by peer: 是 在写入 closed tcp pipe(收到了rst报文) 之后读取操作 报错
  • io.EOF: 如果对端的写端关闭连接,读端继续读,报错EOF

这里 reset by peerio.EOF 存在一定的雷同, 下面针对这三种情况进行测试:

program

broken pipeEOF 的测试中, 使用的server 和 client 代码是一个, 如下:

package main

import (
	"log"
	"net"
	"time"
	"unsafe"
)

func main() {
	doClient()
}

func doClient() {
	d := &net.Dialer{
		KeepAlive: 3 * time.Second,
	}
	conn, err := d.Dial("tcp", "127.0.0.1:8090")
	if err != nil {
		panic(err)
	}

	defer conn.Close()


	buffer := []byte("hello world")
	for {
		n, err := conn.Write(buffer)
		if err != nil {
			log.Printf("err: %v\n", err)
			return
		}
		log.Printf("i: %v successfully send: %v\n", i, n)
		time.Sleep(5 * time.Second)
	}
}
package main

import (
	"log"
	"net"
	"syscall"
	"time"
	"unsafe"
)

func main() {
	setRlimitNOFile(5)
	serve()
}

func serve() error {
	addr := "127.0.0.1:8090"
	listner, err := net.Listen("tcp", addr)
	if err != nil {
		return err
	}

	log.Println("server listen...")
	for {
		conn, err := listner.Accept()
		if err != nil {
			log.Printf("listener accept err: %v\n", err)
			continue
		}
		time.Sleep(5 * time.Second)
		go serveConn(conn)
	}
}

func serveConn(conn net.Conn) {
	defer conn.Close()

	// just for test, not for production
	buffer := make([]byte, 256)
	for {
		n, err := conn.Read(buffer)
		if err != nil {
			log.Printf("conn read err: %v\n", err)
			return
		}
		log.Printf("recv msg: %v with len: %v\n", BytesToString(buffer), n)
	}
}

func BytesToString(b []byte) string {
	return *(*string)(unsafe.Pointer(&b))
}

func setRlimitNOFile(nofile uint64) error {
	if nofile == 0 {
		return nil
	}
	var lim syscall.Rlimit
	if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim); err != nil {
		return err
	}
	log.Printf("lim.Cur: %v", lim.Cur)
	lim.Cur = nofile
	lim.Max = nofile
	return syscall.Setrlimit(syscall.RLIMIT_NOFILE, &lim)
}

但是测试 reset by peer 的场景就比较复杂了, 需要 ulimit 限制, 因为是 mac 机器, 我这里采用的是 代码指定 limit大小, 而没有通过 文件配置. 如下代码

package main

import (
	"log"
	"net"
	"time"
	"unsafe"
)

func main() {
	for i := 0; i < 10; i++ {
		go doClient(i)
	}
	doClient(10)
}

func doClient(i int) {
	d := &net.Dialer{
		KeepAlive: 3 * time.Second,
	}
	conn, err := d.Dial("tcp", "127.0.0.1:8090")
	if err != nil {
		panic(err)
	}

	defer conn.Close()

	go func() {
		buffer := []byte("hello world")
		for {
			n, err := conn.Write(buffer)
			if err != nil {
				log.Printf("err: %v\n", err)
				return
			}
			log.Printf("i: %v successfully send: %v\n", i, n)
			time.Sleep(5 * time.Second)
		}
	}()
	go func() {
		time.Sleep(1 * time.Second)
		rbuffer := make([]byte, 256)
		for {
			n, err := conn.Read(rbuffer)
			if err != nil {
				log.Printf("i:%v conn read err: %v\n", i, err)
				return
			}
			log.Printf("i: %v client recv msg: %v with len: %v\n", i, BytesToString(rbuffer), n)
		}
	}()
	stop := make(chan struct{})
	<-stop
}

func BytesToString(b []byte) string {
	return *(*string)(unsafe.Pointer(&b))
}
package main

import (
	"log"
	"net"
	"syscall"
	"time"
	"unsafe"
)

func main() {
	setRlimitNOFile(5)
	serve()
}

func serve() error {
	addr := "127.0.0.1:8090"
	listner, err := net.Listen("tcp", addr)
	if err != nil {
		return err
	}

	log.Println("server listen...")
	for {
		conn, err := listner.Accept()
		if err != nil {
			log.Printf("listener accept err: %v\n", err)
			continue
		}
		time.Sleep(5 * time.Second)
		go serveConn(conn)
	}
}

func serveConn(conn net.Conn) {
	defer conn.Close()

	// just for test, not for production
	buffer := make([]byte, 256)
	for {
		n, err := conn.Read(buffer)
		if err != nil {
			log.Printf("conn read err: %v\n", err)
			return
		}
		log.Printf("recv msg: %v with len: %v\n", BytesToString(buffer), n)
		conn.Close()
		return
	}
}

func BytesToString(b []byte) string {
	return *(*string)(unsafe.Pointer(&b))
}

func setRlimitNOFile(nofile uint64) error {
	if nofile == 0 {
		return nil
	}
	var lim syscall.Rlimit
	if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &lim); err != nil {
		return err
	}
	log.Printf("lim.Cur: %v", lim.Cur)
	lim.Cur = nofile
	lim.Max = nofile
	return syscall.Setrlimit(syscall.RLIMIT_NOFILE, &lim)
}

broken pipe

demo中, 只要server端先关闭, 那么client端就会显示 err: write tcp 127.0.0.1:62221->127.0.0.1:8090: write: broken pipe

通过tcpdump观察, 简化如下:

-> % sudo tcpdump -i lo0  port 8090 -X
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on lo0, link-type NULL (BSD loopback), capture size 262144 bytes

22:45:49.526730 IP localhost.64918 > localhost.8090: Flags [S], seq 2559638747, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 899979569 ecr 0,sackOK,eol], length 0

22:45:49.526866 IP localhost.8090 > localhost.64918: Flags [S.], seq 3624627785, ack 2559638748, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 899979569 ecr 899979569,sackOK,eol], length 0

22:45:49.526880 IP localhost.64918 > localhost.8090: Flags [.], ack 1, win 6379, options [nop,nop,TS val 899979569 ecr 899979569], length 0

22:45:49.526892 IP localhost.8090 > localhost.64918: Flags [.], ack 1, win 6379, options [nop,nop,TS val 899979569 ecr 899979569], length 0

22:45:49.526937 IP localhost.64918 > localhost.8090: Flags [P.], seq 1:12, ack 1, win 6379, options [nop,nop,TS val 899979569 ecr 899979569], length 11
	0x0030:  35a4 9931 6865 6c6c 6f20 776f 726c 64    5..1hello.world  // 第一轮 发送数据

22:45:49.526954 IP localhost.8090 > localhost.64918: Flags [.], ack 12, win 6379, options [nop,nop,TS val 899979569 ecr 899979569], length 0

22:45:51.485368 IP localhost.8090 > localhost.64918: Flags [F.], seq 1, ack 12, win 6379, options [nop,nop,TS val 899981520 ecr 899979569], length 0

22:45:51.485398 IP localhost.64918 > localhost.8090: Flags [.], ack 2, win 6379, options [nop,nop,TS val 899981520 ecr 899981520], length 0

22:45:54.530274 IP localhost.64918 > localhost.8090: Flags [P.], seq 12:23, ack 2, win 6379, options [nop,nop,TS val 899984540 ecr 899981520], length 11
	0x0030:  35a4 a0d0 6865 6c6c 6f20 776f 726c 64    5...hello.world   // 第二轮 发送的数据, 底层返回rst, 数据其实没有写出

22:45:54.530340 IP localhost.8090 > localhost.64918: Flags [R], seq 3624627787, win 0, length 0

// 第三轮发送 数据 报错 Broken Pipe

可以发现 在 client 处于 CLOSE_WAIT 状态, server处于 FIN_WAIT_2 的时候, 这个时候 突然有发送了第二轮 hello world, 触发了server端响应 reset 报文, 导致了 broken pipe.

reset

通过增大client的连接并发度超过server的处理能力(accept队列大小), 那么client read端就会触发 connection reset by peer, 但是从日志上来看, 其实会先发送一个写命令, 虽然server端并没有提取出来. 从 tcpdump 的角度来看:

-> % sudo tcpdump  -i lo0 src port 8090 -X
Password:
Sorry, try again.
Password:
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on lo0, link-type NULL (BSD loopback), capture size 262144 bytes
11:15:59.200150 IP localhost.8090 > localhost.59394: Flags [S.], seq 3079437778, ack 787178163, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 912764939 ecr 912764939,sackOK,eol], length 0
	0x0000:  4500 0040 0000 4000 4006 0000 7f00 0001  E..@..@.@.......
	0x0010:  7f00 0001 1f9a e802 b78c 7dd2 2eeb 62b3  ..........}...b.
	0x0020:  b012 ffff fe34 0000 0204 3fd8 0103 0306  .....4....?.....
	0x0030:  0101 080a 3667 b00b 3667 b00b 0402 0000  ....6g..6g......
11:15:59.200187 IP localhost.8090 > localhost.59394: Flags [.], ack 1, win 6379, options [nop,nop,TS val 912764939 ecr 912764939], length 0
	0x0000:  4500 0034 0000 4000 4006 0000 7f00 0001  E..4..@.@.......
	0x0010:  7f00 0001 1f9a e802 b78c 7dd3 2eeb 62b3  ..........}...b.
	0x0020:  8010 18eb fe28 0000 0101 080a 3667 b00b  .....(......6g..
	0x0030:  3667 b00b                                6g..
11:15:59.200304 IP localhost.8090 > localhost.59394: Flags [F.], seq 1, ack 1, win 6379, options [nop,nop,TS val 912764939 ecr 912764939], length 0
	0x0000:  4500 0034 0000 4000 4006 0000 7f00 0001  E..4..@.@.......
	0x0010:  7f00 0001 1f9a e802 b78c 7dd3 2eeb 62b3  ..........}...b.
	0x0020:  8011 18eb fe28 0000 0101 080a 3667 b00b  .....(......6g..
	0x0030:  3667 b00b                                6g..
11:15:59.200441 IP localhost.8090 > localhost.59394: Flags [R], seq 3079437780, win 0, length 0
	0x0000:  4500 0028 0000 4000 4006 0000 7f00 0001  E..(..@.@.......
	0x0010:  7f00 0001 1f9a e802 b78c 7dd4 0000 0000  ..........}.....
	0x0020:  5004 0000 fe1c 0000                      P.......

可以明显观察到, 连握手都没有完成, client 发送的sync, server 只返回了 ack 的内容, 但是没有 sync, 接着 server 发送了 fin 包关闭连接, 当 client 发起写入动作的时候, 触发了 server 端发送 reset 报文, client 再次读取的时候就发生了 reset by peer 的报错.

但是从 java的复现角度, https://imroc.io/posts/kubernetes/no-route-to-host/, 看上去java在 select closed conn 也会有这种情况 (待确认)

从目前找到的资料而言, 比较合理的解释如下:

  • 连接被意外复用, 通过 ”connection reset by peer” 达到连接被关闭
  • 请求量过多,服务端直接拒绝了该请求 (可以通过 ulimit 复现)

关于 limit:

代码中通过设置 resource limit 实现了效果, resource limit 有 soft 和 hard 两种, 分别如下:

  • soft limit 程序的进程可以自行修改, 内核支持的资源上限.
  • hard limit 非特权程序的上线, soft limit小于 hard limit. 特权用户进程使用 soft limit的限制

可以修改 /etc/sysctl.conf 相当于修改 hard limit, /etc/security/limit.conf ?

参考:

io.eof

demo中, 只需要 client 进程先关闭, 那么server端就会显示 conn read err: EOF. 对应简化的tcpdump

-> % sudo tcpdump -i lo0  port 8090 -X
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on lo0, link-type NULL (BSD loopback), capture size 262144 bytes

22:33:30.265967 IP localhost.64599 > localhost.8090: Flags [S], seq 455781167, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 899251467 ecr 0,sackOK,eol], length 0

22:33:30.266046 IP localhost.8090 > localhost.64599: Flags [S.], seq 2848968819, ack 455781168, win 65535, options [mss 16344,nop,wscale 6,nop,nop,TS val 899251467 ecr 899251467,sackOK,eol], length 0

22:33:30.266056 IP localhost.64599 > localhost.8090: Flags [.], ack 1, win 6379, options [nop,nop,TS val 899251467 ecr 899251467], length 0

22:33:30.266064 IP localhost.8090 > localhost.64599: Flags [.], ack 1, win 6379, options [nop,nop,TS val 899251467 ecr 899251467], length 0

22:33:30.266120 IP localhost.64599 > localhost.8090: Flags [P.], seq 1:12, ack 1, win 6379, options [nop,nop,TS val 899251467 ecr 899251467], length 11

22:33:30.266131 IP localhost.8090 > localhost.64599: Flags [.], ack 12, win 6379, options [nop,nop,TS val 899251467 ecr 899251467], length 0

22:33:33.377102 IP localhost.64599 > localhost.8090: Flags [F.], seq 12, ack 1, win 6379, options [nop,nop,TS val 899254528 ecr 899251467], length 0

22:33:33.377164 IP localhost.8090 > localhost.64599: Flags [.], ack 13, win 6379, options [nop,nop,TS val 899254528 ecr 899254528], length 0

22:33:40.273472 IP localhost.8090 > localhost.64599: Flags [F.], seq 1, ack 13, win 6379, options [nop,nop,TS val 899261312 ecr 899254528], length 0

22:33:40.273521 IP localhost.64599 > localhost.8090: Flags [.], ack 2, win 6379, options [nop,nop,TS val 899261312 ecr 899261312], length 0

怎么多了一个ack

三次握手 sync-send -> senc-recvd -> establised 本质上就创建好了连接, 对应的tcp报文是 sync、sync(+ack)、ack 就可以了. 但是可以发现上面多了一个ack, 这是因为tcp报文对每个相应都会有一个 ack ?

源码

根据网上小伙伴的排查, reset by peer 和 broken pipe 在 glibc 中, 位置: sysdeps/gnu/errlist.c

 354 /*
 355 TRANS There is no process reading from the other end of a pipe.
 356 TRANS Every library function that returns this error code also generates a
 357 TRANS @code{SIGPIPE} signal; this signal terminates the program if not handled
 358 TRANS or blocked.  Thus, your program will never actually see @code{EPIPE}
 359 TRANS unless it has handled or blocked @code{SIGPIPE}. */
 360     [ERR_REMAP (EPIPE)] = N_("Broken pipe"),

 611 /*
 612 TRANS A network connection was closed for reasons outside the control of the
 613 TRANS local host, such as by the remote machine rebooting or an unrecoverable
 614 TRANS protocol violation. */
 615     [ERR_REMAP (ECONNRESET)] = N_("Connection reset by peer"),

在golang实现中, 对应操作系统, 也梳理了一下向上的报错, 对应如下:

{104, "ECONNRESET", "connection reset by peer"},
{13, "SIGPIPE", "broken pipe"},
{32, "EPIPE", "broken pipe"},

但是在linux源码中, 处理rst报文的逻辑 根据不同状态的操作是不一样的:

/* When we get a reset we do this. */
void tcp_reset(struct sock *sk)
{
	trace_tcp_receive_reset(sk);

	/* We want the right error as BSD sees it (and indeed as we do). */
	switch (sk->sk_state) {
	case TCP_SYN_SENT:
		sk->sk_err = ECONNREFUSED;
		break;
	case TCP_CLOSE_WAIT:
		sk->sk_err = EPIPE;
		break;
	case TCP_CLOSE:
		return;
	default:
		sk->sk_err = ECONNRESET;
	}
	/* This barrier is coupled with smp_rmb() in tcp_poll() */
	smp_wmb();

	tcp_write_queue_purge(sk);
	tcp_done(sk);

	if (!sock_flag(sk, SOCK_DEAD))
		sk->sk_error_report(sk);
}

可以发现, 只要不是 TCP_SYN_SENT、TCP_CLOSE_WAIT 和 TCP_CLOSE, 只要收到 rst 报文, 都会显示 ECONNRESET. 我们的demo演示 client处于 ESTABLISHED 的状态.

补充

too many open files

常见问题, 不赘述

类似

这个用户的场景不是很理解: https://testerhome.com/articles/23296

面试题

用什么工具观察 tcpip 的交互流程?

tcp有哪些包(packet)的类型, 分别代表什么意思, 什么场景用: SYN(建立连接), FIN(关闭连接), ACK(响应), PSH(DATA数据传输), RST(连接重置), URG(紧急); RST一般是在FIN之后才会出现, 一般出现FIN包或RST包时,我们便认为客户端与服务器端断开了连接.

意外

查阅资料的时候, 无意间竟然发现了 寒泉子 的blog记录了相关的post, 参考: http://lovestblog.cn/blog/2014/05/20/tcp-broken-pipe/, 寒泉子目前创业, 之前在 jvm 工作.

另外, Broken pipereset by peer 竟然在 glic 中有, 也是很奇怪的, 看了资料才知道, 很多linux逻辑其实也是封装在了glibc