Go runtime 02: netpoll
最近重看 Go 的 runtime 的时候发现了 netpoll.go. 以为时最近加入的内容, 但 git blame 下却发现时很早以前就存在了. 非常佩服自己的主动过滤系统.
netpoll 是 Go 是对不同平台 IO 的封装, 比如 Linux’s 的 epoll. 一方面是和其他语言的三方库一样, 简化上游使用者的心智负担. 另一方, 通过实现在 runtime, 和 GMP 调度模型高度结合, 尽可能的提高效率.
IO 相关的基础知识
我们首先明确两组属性.
阻塞和非阻塞 IO, 前者指调用阻塞在套接字上直到有数据可以读取/写入, 后者指在没有数据可以读取/无法写入数据时调用直接返回, 需要由调用者采用轮训等方法决定处理方式.
同步和异步 IO, 前者指调用直接生效, 执行完后返回结果, 后者指调用立刻返回, 但具体工作后续执行, 结果通过某种方式通知调用者.
Linux 的套接字(socket) 即支持阻塞模式也支持非阻塞模式, 可以在创建时通过参数指定. 但一般是同步的, 异步 socket 似乎在 windows 中比较流行.
而诸如 Nginx 这类网关需要同时处理成千上万的链接, 创建对等数量的线程阻塞在链接上是不现实的, 他们一般会选择同步非阻塞 IO, 配合轮训来让一个线程处理成百上千个链接.
但 IO 操作一般是系统调用,涉及内核态到系统态的转换, 异常昂贵. 所以我们通过多路复用, 实现一次调用操作多个套接字来减少对系统 API 的调用. 这在 Linux 上对应 select/poll/epoll, 前两者需要在轮训是传入所有关注的套接字, 导致高昂的内存成本. epoll 通过拆分处单独的注册接口, 使得高频的轮训操作不需要传入所有套接字, 进而取得更高的效率, 成为 Linux 下的主流选择.
如何使用 netpoll
Go 中的 TCP 使用 netpoll 实现, 我们以 Read 操作为例来观察如何使用 netpoll.
首先 Read 操作会被委托给 internal/poll.FD
:
// TCPConn is an implementation of the Conn interface for TCP network
// connections.
type TCPConn struct {
conn
}
...
type conn struct {
fd *netFD
}
...
// Network file descriptor.
type netFD struct {
pfd poll.FD
// immutable until Close
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
...
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError(readSyscallName, err)
}
internal/poll.FD
是对 runtime/netpoll.go
的抽象和封装,
FD.Read
内部首先尝试直接读取数据, 如果没有则阻塞在套接字上, 等待数据到达后触发下一次尝试.
阻塞这个操作通过 runtime.pollWait
实现.
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
...
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
...
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
Read 的前提条件是通过 pd.init 将套接字注册到 netpoll.
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
...
// Init initializes the FD. The Sysfd field should already be set.
// This can be called multiple times on a single FD.
// The net argument is a network name from the net package (e.g., "tcp"),
// or "file".
// Set pollable to true if fd should be managed by runtime netpoll.
func (fd *FD) Init(net string, pollable bool) error {
fd.SysFile.init()
// We don't actually care about the various network types.
if net == "file" {
fd.isFile = true
}
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller,
// assume we are using blocking mode.
fd.isBlocking = 1
}
return err
}
netpoll 的实现
在上一节中, 我们通过 Read 已经明确 netpoll 的使用方式:
- 通过 runtime.pollOpen 将套接字注册到 netpoll
- 通过 runtime.pollWait 让出 CPU 的使用权直到数据到达
这些函数都在 runtime/netpoll.go, 包含了 netpoll 的逻辑代码. runtime/netpoll_epoll.go 则是对 epoll 相关的系统调用的简单封装.
netpoll 的核心逻辑在 pollDesc 的状态字段 rg&wg. 字段可能有四种值:
- pdReady, 代表此时句柄可读
- pdWait, 即将有 goroutine 阻塞在句柄上
- 指向 goroutine 的指针, 代表阻塞在句柄上的 goroutine
- pdNil, 即没有阻塞的 goroutine, 也没有数据可读
此处主要的奥秘在于:
- 向 epoll 注册句柄时传入的 epoll_event, 除了可以传递关注的事件, 还可以保存自定义的事件, 此处为指向 pollDesc 的指针
- goroutine 阻塞在句柄上时, 会将 pollDesc 的 rg/wg 修改指向自身的指针
- 所以当某个句柄就绪时, 可以找到对应的 pollDesc, 并判断是否有 goroutine 阻塞与此句柄
在这个基础上, Go 的调度器在调度时会查看是否有之前阻塞在 epoll 上当现在就绪的 goroutine, 如果存在则将其加入调度中.