Go runtime 01: chan
传统的线程模型通过共享内存来实现多个线程之间的相互交流. 依赖精妙的设计和锁等机制来保证程序的正确性. 而 Go 通过 chan 推广了另外一种模式, 即
Do not communicate by sharing memory; instead, share memory by communicating.
通过汇编代码, 我们可以快速的定位到 chan 相关的函数:
runtime.makechan
用于创建 chanruntime.chansend1
用于向 chan 发送数据runtime.chanrecv1
用于从 chan 接收数据138025 func main() { 138026 457680: 49 3b 66 10 cmp 0x10(%r14),%rsp 138027 457684: 0f 86 c1 00 00 00 jbe 45774b <main.main+0xcb> 138028 45768a: 55 push %rbp 138029 45768b: 48 89 e5 mov %rsp,%rbp 138030 45768e: 48 83 ec 40 sub $0x40,%rsp 138031 457692: 66 44 0f d6 7c 24 38 movq %xmm15,0x38(%rsp) 138032 457699: c6 44 24 17 00 movb $0x0,0x17(%rsp) 138033 ch := make(chan int, 2) 138034 45769e: 48 8d 05 5b 56 00 00 lea 0x565b(%rip),%rax # 45cd00 <type:*+0x4d00> 138035 4576a5: bb 02 00 00 00 mov $0x2,%ebx 138036 4576aa: e8 71 bd fa ff call 403420 <runtime.makechan> 138037 4576af: 48 89 44 24 20 mov %rax,0x20(%rsp) 138038 defer close(ch) 138039 4576b4: 44 0f 11 7c 24 28 movups %xmm15,0x28(%rsp) 138040 4576ba: 48 8d 0d 9f 00 00 00 lea 0x9f(%rip),%rcx # 457760 <main.main.func1> 138041 4576c1: 48 89 4c 24 28 mov %rcx,0x28(%rsp) 138042 4576c6: 48 89 44 24 30 mov %rax,0x30(%rsp) 138043 4576cb: 48 8d 4c 24 28 lea 0x28(%rsp),%rcx 138044 4576d0: 48 89 4c 24 38 mov %rcx,0x38(%rsp) 138045 4576d5: c6 44 24 17 01 movb $0x1,0x17(%rsp) 138046 ch <- 10 138047 4576da: 48 8d 1d f7 68 02 00 lea 0x268f7(%rip),%rbx # 47dfd8 <runtime.egcbss+0x5> 138048 4576e1: e8 1a bf fa ff call 403600 <runtime.chansend1> 138049 ch <- 20 138050 4576e6: 48 8b 44 24 20 mov 0x20(%rsp),%rax 138051 4576eb: 48 8d 1d ee 68 02 00 lea 0x268ee(%rip),%rbx # 47dfe0 <runtime.egcbss+0xd> 138052 4576f2: e8 09 bf fa ff call 403600 <runtime.chansend1> 138053 add(ch) 138054 4576f7: 90 nop 138055 a := <-ch 138056 4576f8: 48 c7 44 24 18 00 00 movq $0x0,0x18(%rsp) 138057 4576ff: 00 00 138058 457701: 48 8b 44 24 20 mov 0x20(%rsp),%rax 138059 457706: 48 8d 5c 24 18 lea 0x18(%rsp),%rbx 138060 45770b: e8 10 cc fa ff call 404320 <runtime.chanrecv1> 138061 b := <-ch 138062 457710: 48 c7 44 24 18 00 00 movq $0x0,0x18(%rsp) 138063 457717: 00 00 138064 457719: 48 8b 44 24 20 mov 0x20(%rsp),%rax 138065 45771e: 48 8d 5c 24 18 lea 0x18(%rsp),%rbx 138066 457723: e8 f8 cb fa ff call 404320 <runtime.chanrecv1> 138067 }
chan 在 runtime 中对应结构体 hchan, 其中比较直接的包括:
- elemtype 对应元素类型信息, 编译时产生
- elemsize 对应元素占用的内存大小
- dataqsiz 是最多可以缓存元素的数量, 即
make(chan int, 2)
中的 2 - closed 非 0 代表 chan 已被关闭
makechan 会直接申请需要的内存, 即变量 buf, 大小由单个元素的大小(elemsize)和缓存数量(dataqsize)决定. 元素会以环形队列的形式保存在 buf 中, qcount 是队列中元素的数量, sendx 是队首元素的坐标, recvx 是队尾元素的坐标.
将发送的元素保存在的缓存队列中代码 chan.go#L216:
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
接收时直接从缓存队列中获取元素的代码 chan.go#L537:
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
sendq 和 recvq 是两个 goroutine 队列, 前者保存了阻塞在发消息上的 goroutine, 后者保存了等待接收消息的 goroutine. 在发送和接收消息时, 都会优先判断是否由等待的 goroutine, 避免需要额外经过缓存中转一次.
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
如果发消息时即没有等待的接受者也没有可用的缓存空间, 则发送消息的 goroutine 会主动让处 CPU, 将自己的状态从 running 变为 waiting, 即对 gopark 的调用. chan.go#L237:
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
阻塞于发消息的 goroutine 需要等到其他接收消息的 goroutine 唤醒自己, 状态从 waiting 变化为 runnable 后, 才有机会继续执行, 即对 goready 的调用.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
gopark 和 goready 我们后续介绍 GMP 的时候在展开.
共享内存和 chan 并没有优劣之分, 或者至少我认为二者没有谁是绝对更优. chan 确实大幅简化了上层使用者的使用难度, 同时也依赖 goroutine 这样的轻量级协程.