【Go 原理】Channel 源码分析

hchan 源码分析

channel本质上是一个有锁的环形队列,外加发送方队列(sendq)、接收方队列(recvq),加上互斥锁 mutex 等结构。

image-20211029160929781

hchan结构体源码:/src/runtime/chan.go go版本:1.15.11

  • 通过buf来保存G之间传输的数据。
  • 通过两个队列recvqsendq来保存发送和接收的 G。
  • 通过mutex来保护数据安全。
type hchan struct {
  // 队列中元素的总数
    qcount   uint           // total data in the queue
  // 循环队列的长度
    dataqsiz uint           // size of the circular queue
  // 指向长度为 dataqsiz 的底层数组,仅有当 channel 为缓冲型的才有意义
    buf      unsafe.Pointer // points to an array of dataqsiz elements 
  // 能够接受和发送的元素大小
    elemsize uint16 // chan中元素的大小
    closed   uint32 // 是否已close 1 表示已关闭 0 表示未关闭
    elemtype *_type // element type
  sendx    uint   // send index (ch <- xxx)
  recvx    uint   // receive index  (ch <- xxx)
    recvq    waitq  // list of recv waiters 
  // 发送者的 sudog 等待队列
    sendq    waitq  // list of send waiters 

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex // map不是线程安全的,但是channel是线程安全的,因为这里有互斥锁
}

type waitq struct {
    first *sudog
    last  *sudog
}

type sudog struct {
    g *g // 指向当前的 goroutine

    next *sudog // 指向下一个 g
    prev *sudog // 指向上一个 g
    elem unsafe.Pointer // data element (may point to stack) 数据元素,可能会指向堆栈
  ....
    c        *hchan // channel
}

实现源码分析

channel 的四大块操作分别是:创建chan、发送数据、接收数据、关闭chan。接下来从源码角度进行分析。

创建chan

创建 channel 的演示代码:

ch := make(chan int , 3) // 初始化环形队列 buf,初始化发送和接收的索引
// 通用创建方法
func makechan(t *chantype, size int) *hchan
// 类型为 int64 的进行特殊处理
func makechan64(t *chantype, size int64) *hchan

创建 channel的逻辑主要分为三大块:

  • 当前 channel 不存在缓冲区,也就是元素大小为 0 的情况下,就会调用 mallocgc 方法分配一段连续的内存空间。
  • 当前 channel 存储的类型存在指针引用,就会连同 hchan 和底层数组同时分配一段连续的内存空间。
  • 通用情况,默认分配相匹配的连续内存空间。

需要注意到一块特殊点,那就是 channel 的创建都是调用的 mallocgc 方法,也就是 channel 都是创建在堆上的。因此 channel 是会被 GC 回收的,自然也不总是需要 close 方法来进行显示关闭了。

makechan 源码路径为:src/runtime/chan.go

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")
    }
    if hchanSize%maxAlign != 0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

    mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }

    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
    }
    return c
}

makechan 方法的逻辑比较简单,就是创建 hchan 并分配合适的 buf 大小的堆上内存空间。

image-20211029150550896

发送数据

channel 发送数据的演示代码:

go func() {
    ch <- "wangxiong"
}()

其在编译器翻译后对应 runtime/chan.go/chansend1 方法:

// entry point for c <- x from compiled code
// go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

其作为编译后的入口方法,实则指向真正的实现逻辑,也就是 chansend 方法。 chansend 方法主要完成以下几个事情。

  • chan 发送前的前置判断和处理。
  • 在进入发送数据的处理前,channel会进行上锁。
  • 在正式开始发送前,加锁之后,会对 channel进行一次状态判断(是否关闭),未关闭直接发送。
  • 非直接发送,判断 channel 缓冲区中是否还有空间,如果有进行缓冲发送,否则进入阻塞发送。
// src/runtime/chan.go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // ① chan 发送前的前置判断和处理。
    if c == nil {
        if !block {
            return false
        }
    // 若为 nil,在逻辑上来讲就是向 nil channel 发送数据。
    // 就会调用 gopark 方法使得当前 Goroutine 休眠,进而出现死锁崩溃,表象就是出现 panic 事件来快速失败。
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
  ......
  // 对非阻塞的 channel 进行一个上限判断,看看是否快速失败。
  // 若非阻塞且未关闭,同时底层数据 dataqsiz 大小为 0(缓冲区无元素),则会返回失败。
  // 若是 qcount 与 dataqsiz 大小相同(缓冲区已满)时,则会返回失败。
    if !block && c.closed == 0 && full(c) {
        return false
    }
  ......
  // ② 在进入发送数据的处理前,channel 会进行上锁,保障并发安全
    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }

  // ③ 有正在阻塞等待的接收方,则直接发送。
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

  // ④ 对缓冲区进行判定(qcount 和 dataqsiz 字段),以此识别缓冲区的剩余空间。
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
    // 调用 chanbuf 方法,以此获得底层缓冲数据中位于 sendx 索引的元素指针值
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
    // 调用 typedmemmove 方法,将所需发送的数据拷贝到缓冲区中
        typedmemmove(c.elemtype, qp, ep)
    // 数据拷贝后,对 sendx 索引自行自增 1。
        c.sendx++
    // 若 sendx 与 dataqsiz 大小一致,则归 0(环形队列)。
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++ // 自增完成后,队列总数同时自增 1
        unlock(&c.lock) // 解锁互斥锁
        return true // 返回结果
    }
 // 未走进缓冲区处理的逻辑,判断当前是否阻塞 channel,若为非阻塞,将会解锁并直接返回失败。
    if !block {
        unlock(&c.lock)
        return false
    }

  // ⑤ 进入阻塞等待发送
  // 调用 getg 方法获取当前 goroutine 的指针,用于后续发送数据。
    gp := getg()
  // 调用 acquireSudog 方法获取 sudog 结构体,并设置当前 sudog 具体的待发送数据信息和状态。
    mysg := acquireSudog()
    ......
  // 调用 c.sendq.enqueue 方法将刚刚所获取的 sudog 加入待发送的等待队列。
    c.sendq.enqueue(mysg)
  ......
  // 调用 gopark 方法挂起当前 goroutine(会记录执行位置),状态为 waitReasonChanSend,阻塞等待 channel。
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
  // 调用 KeepAlive 方法保证待发送的数据值是活跃状态,也就是分配在堆上,避免被 GC 回收。
    KeepAlive(ep)

    // someone woke us up.
  // 从这里开始唤醒,并恢复阻塞的发送操作
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    ......
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

接收数据

channel 接收数据的演示代码:

msg := <-ch

msg, ok := <-ch

两种方法在编译器翻译后分别对应 runtime.chanrecv1runtime.chanrecv2 两个入口方法,其再在内部再进一步调用 runtime.chanrecv 方法:

// src/runtime/chan.go
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

最终调用的是chanrecv方法:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ......
  // ① 若 channel 是非阻塞模式,则直接返回。
  // ② 若 channel 是 nil channel,且为阻塞接收则调用 gopark 方法挂起当前 goroutine。
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

    // Fast path: check for failed non-blocking operation without acquiring the lock.
    if !block && empty(c) {
        // After observing that the channel is not ready for receiving, we observe whether the
        // channel is closed.
        //
        // Reordering of these checks could lead to incorrect behavior when racing with a close.
        // For example, if the channel was open and not empty, was closed, and then drained,
        // reordered reads could incorrectly indicate "open and empty". To prevent reordering,
        // we use atomic loads for both checks, and rely on emptying and closing to happen in
        // separate critical sections under the same lock.  This assumption fails when closing
        // an unbuffered channel with a blocked send, but that is an error condition anyway.
        if atomic.Load(&c.closed) == 0 {
            // Because a channel cannot be reopened, the later observation of the channel
            // being not closed implies that it was also not closed at the moment of the
            // first observation. We behave as if we observed the channel at that moment
            // and report that the receive cannot proceed.
            return
        }
        // The channel is irreversibly closed. Re-check whether the channel has any pending data
        // to receive, which could have arrived between the empty and closed checks above.
        // Sequential consistency is also required here, when racing with such a send.
        if empty(c) {
            // The channel is irreversibly closed and empty.
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    lock(&c.lock)

    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

  // channel 上有正在阻塞等待的发送方时,则直接进行接收
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

  // 当发现 channel 的缓冲区中有元素时,将会调用 chanbuf 方法,根据 recvx 的索引位置取出数据,找到要接收的元素进行处理。
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
    // 若所接收到的数据和所传入的变量均不为空,则会调用 typedmemmove 方法将缓冲区中的数据拷贝到所传入的变量中。
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

    if !block {
        unlock(&c.lock)
        return false, false
    }

    // no sender available: block on this channel.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    // No stack splits between assigning elem and enqueuing mysg
    // on gp.waiting where copystack can find it.
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    // Signal to anyone trying to shrink our stack that we're about
    // to park on a channel. The window between when this G's status
    // changes and when we set gp.activeStackChans is not safe for
    // stack shrinking.
    atomic.Store8(&gp.parkingOnChan, 1)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

    // someone woke us up
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    gp.activeStackChans = false
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

关闭 chan

关闭 channel 主要是涉及到 close 关键字:

close(ch)

其对应的编译器翻译方法为 closechan 方法:

func closechan(c *hchan)

关闭chan源码解析:

func closechan(c *hchan) {
  // 基本检查和关闭标志设置,保证 channel 不为 nil 和未关闭,保证边界。
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }

    if raceenabled {
        callerpc := getcallerpc()
        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
        racerelease(c.raceaddr())
    }

    c.closed = 1

    var glist gList

  // 将接受者的 sudog 等待队列(recvq)加入到待清除队列 glist 中。
    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }

  // 将发送方也加入到到待清除队列 glist 中。
    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

相关推荐

微信扫一扫,分享到朋友圈

【Go 原理】Channel 源码分析
返回顶部

显示

忘记密码?

显示

显示

获取验证码

Close