Golang的channel
目录
基本使用
channel 数据结构
阻塞的协程队列
协程节点
构建 channel
写流程
读流程
非阻塞与阻塞
closechan(关闭)
基本使用
创建无缓存 channel
c := make(chan int) //创建无缓冲的通道 cc := make(chan int,0) //创建无缓冲的通道 c
创建有缓存 channel
c := make(chan int, 3) //创建无缓冲的通道 c
例子:
package mainimport ("fmt""time"
)func main() {c := make(chan int, 3) //创建有缓冲的通道 c//内置函数 len 返回未被读取的缓冲元素数量,cap 返回缓冲区大小fmt.Printf("len(c)=%d, cap(c)=%d\n", len(c), cap(c))go func() {defer fmt.Println("子go程结束")for i := 0; i < 3; i++ {c <- ifmt.Printf("子go程正在运行[%d]: len(c)=%d, cap(c)=%d\n", i, len(c), cap(c))}}()time.Sleep(2 * time.Second) //延时2sfor i := 0; i < 3; i++ {num := <-c //从c中接收数据,并赋值给numfmt.Println("num = ", num)}fmt.Println("main进程结束")
}
channel 数据结构
type hchan struct {qcount uint // total data in the queuedataqsiz uint // size of the circular queuebuf unsafe.Pointer // points to an array of dataqsiz elementselemsize uint16closed uint32elemtype *_type // element typesendx uint // send indexrecvx uint // receive indexrecvq waitq // list of recv waiterssendq waitq // list of send waiterslock mutex
}
hchan:channel 数据结构
• qcount:当前 channel 中存在多少个元素;
• dataqsize: 当前 channel 能存放的元素容量;
• buf:channel 中用于存放元素的环形缓冲区;
• elemsize:channel 元素类型的大小;
• closed:标识 channel 是否关闭;
• elemtype:channel 元素类型;
• sendx:发送元素进入环形缓冲区的 index;
• recvx:接收元素所处的环形缓冲区的 index;
• recvq:因接收而陷入阻塞的协程队列;
• sendq:因发送而陷入阻塞的协程队列;
lock mutex 锁
阻塞的协程队列
type waitq struct {first *sudoglast *sudog
}
waitq:阻塞的协程队列
• first:队列头部
• last:队列尾部
协程节点
sudog:用于包装协程的节点
type sudog struct {g *gnext *sudogprev *sudogelem unsafe.Pointer // data element (may point to stack)isSelect boolc *hchan
}
• g:goroutine,协程;
• next:队列中的下一个节点;
• prev:队列中的前一个节点;
• elem: 读取/写入 channel 的数据的容器;
• isSelect:标识当前协程是否处在 select 多路复用的流程中;
• c:标识与当前 sudog 交互的 chan.
构建 channel
func makechan(t *chantype, size int) *hchan {elem := t.elem// ...mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}var c *hchanswitch {case mem == 0:// Queue or element size is zero.c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.ptrdata == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)return
}
写流程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// ...//加锁lock(&c.lock)// ...//写时存在阻塞读协程if sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}//写时不存在阻塞读协程,且缓冲区不满仍有空间if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}//写时不存在阻塞读协程,且缓冲区满了没有空间// ...gp := getg()mysg := acquireSudog()mysg.elem = epmysg.g = gpmysg.c = cgp.waiting = mysgc.sendq.enqueue(mysg)atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)gp.waiting = nilclosed := !mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)return true
}
总结:
1.当写时存在阻塞读协程,我们直接用
2.当写时不存在阻塞读协程,且缓冲区不满仍有空间时,我们直接加入环形缓冲区中
3.当写时不存在阻塞读协程,且缓冲区满了没用空间时,加入阻塞写协程队列中
注意:
1.有阻塞读协程和缓冲区满之间只有一个条件符合
2.对于未初始化的 chan,写入操作会引发死锁
3.对于已关闭的 chan,写入操作会引发 panic.
读流程
读流程与写流程差不多,不同点:
1.加入的是阻塞读队列
2.当环形缓冲区有和无数据时会有不同的操作
注意:
1.读空channel, park挂起,引起死锁
2.channel 已关闭且内部无元素,直接解锁返回
非阻塞与阻塞
区别:
非阻塞模式下,读/写 channel 方法通过一个 bool 型的响应参数,用以标识是否读取/写入成功.
• 所有需要使得当前 goroutine 被挂起的操作,在非阻塞模式下都会返回 false;
• 所有是的当前 goroutine 会进入死锁的操作,在非阻塞模式下都会返回 false;
• 所有能立即完成读取/写入操作的条件下,非阻塞模式下会返回 true.
何时进入非阻塞
默认情况下,读/写 channel 都是阻塞模式,只有在 select 语句组成的多路复用分支中,
与 channel 的交互会变成非阻塞模式:
在sudog:用于包装协程的节点
• isSelect:标识当前协程是否处在 select 多路复用的流程中;
closechan(关闭)
func closechan(c *hchan) {if c == nil {panic(plainError("close of nil channel"))}lock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}c.closed = 1var glist gList// release all readersfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseglist.push(gp)}// release all writers (they will panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilgp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseglist.push(gp)}unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0goready(gp, 3)
关闭未初始化过的 channel 会 panic;
• 加锁;
• 重复关闭 channel 会 panic;
• 将阻塞读协程队列中的协程节点统一添加到 glist;
• 将阻塞写协程队列中的协程节点统一添加到 glist;
• 唤醒 glist 当中的所有协程.
防止还有协程挂起,没有被唤醒的资源浪费
参考:小徐先生1212 -- Golang Channel实现原理