【Go 原理】Channel 读写特性和 CSP 思想

Channel

Channel 的读写特性

channel的读写特性是什么?会发生painc的情况是有几种,分别是什么?下面的代码输出什么?

image-20211112231007294

channel的读写特性(空读写阻塞,写关闭异常,读关闭空零):

  • 从一个 nil channel 接收数据,造成永远阻塞。
  • 给一个 nil channel 发送数据,造成永远阻塞。
  • 给一个nil channel关闭,引起painc
  • 从一个empty channel接收数据,会造成阻塞。
  • 给一个full channel发送数据,会造成阻塞。
  • 从一个closed channel接收数据,会返回未读的元素,如果缓冲区为空,则读完后返回零值。
  • 给一个已经关闭的 closed channel 发送数据,引起 panic
  • 关闭一个已经关闭的closed channel,引起painc
  • 无缓冲的channel是同步的,而有缓冲的channel是非同步的。

panic 的情况,总共有 3 种:

  • closenilchan;
  • send 已经 closechan;
  • close 已经 closechan

block的情况,总共有 4 种:

  • 从一个 nil channel 接收数据,造成永远阻塞。
  • 给一个 nil channel 发送数据,造成永远阻塞。
  • 从一个empty channel接收数据,会造成阻塞。
  • 给一个full channel发送数据,会造成阻塞。

执行下面的代码发生什么?

package main

import (
    "fmt"
    "runtime"
    "time"
)

// 结果:一段时间后总是输出 #goroutines: 2
// 解析:因为 ch 未初始化,写和读都会阻塞,之后被第一个协程重新赋值,导致写的 ch 阻塞。
func main() {
    var ch chan int // nil
    // ch = make(chan int, 1)
    go func() {
        ch = make(chan int, 1)
        ch <- 1
    }()
    go func(ch chan int) {
        time.Sleep(time.Second)
        <-ch
    }(ch)
    // panic: close of nil channel
    // panic: send on closed channel
    //close(ch)
    c := time.Tick(1 * time.Second)
    for range c {
        fmt.Printf("#goroutines: %d\n", runtime.NumGoroutine())
    }
}

执行下面的代码发生什么?

package main

import (
    "fmt"
    "time"
)

// panic: send on closed channel

func main() {
    ch := make(chan int, 1000)
    go func() {
        for i := 0; i < 10; i++ {
            ch <- i
        }
    }()
    go func() {
        for {
            a, ok := <-ch
            if !ok {
                fmt.Println("close")
                return
            }
            fmt.Println("a: ", a)
        }
    }()
    close(ch)
    fmt.Println("ok")
    time.Sleep(time.Second * 100)
}

解析:给一个已经关闭的 channel 发送数据,引起 panic

CSP 模型思想

是否了解golangCSP并发模型的思想?谈谈你对channel的理解?

CSP 模型是上个世纪七十年代提出的,不同于传统的多线程通过共享内存来通信,CSP 讲究的是以通信的方式来共享内存。用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。CSPchannel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的 channel

channel 的经典思想:不要通过共享内存来通信,而是通过通信来实现内存共享JAVA/C++等语言倡导共享内存来通信,而Go倡导以通信的方式来共享内存。

Do not communicate by sharing memory; instead,share memory by communicating.

channelgoroutine 之间通信(读写)的通道。因为它的存在,显得 Golang(或者说CSP)与传统的共享内存型的并发模型截然不同,用 Effective Go 里的话来说就是:

Do not communicate by sharing memory; instead, share memory by communicating.

Golang 的并发模型中,我们并不关心是哪个 goroutine(匿名性)在用 channel,只关心 channel 的性质:

  • 是只读还是只写?
  • 传递的数据类型?
  • 是否有缓冲区?

CSPActor之间的区别:

  • CSP 解耦发送方和接收方,注重消息传递方式。
  • Actor Model之间直接通讯,注重处理单元。

image-20211029121333099

gochannel 是被单独创建并且可以在进程之间传递,它的通信模式类似于 boss-worker 模式的,一个实体通过将消息发送到 channel 中,然后又监听这个 channel的实体处理,两个实体之间是匿名的,这个就实现实体中间的解耦,其中 channel是同步的一个消息被发送到 channel 中,最终是一定要被另外的实体消费掉的,在实现原理上其实类似一个阻塞的消息队列。

CSP(Communicating Sequential Process) 描述这样一种并发模型:多个Process 使用一个 Channel 进行通信, 这个 Channel连结的 Process 通常是匿名的,消息传递通常是同步的(有别于 Actor Model)。

CSP 最早是由 Tony Hoare 在 1977 年提出一个理论模型,也是一本书的名字,有兴趣可以查阅电子版本:http://www.usingcsp.com/cspbook.pdf。

Golang 只用到了 CSP 的很小一部分,即理论中的 Process/Channelgoroutine/channel):这两个并发之间没有从属关系, Process 可以订阅任意 ChannelChannel也并不关心是哪个Process在利用它进行通信;Process 围绕 Channel进行读写,形成一套有序阻塞和可预测的并发模型。

image-20211029152454752

无缓冲的 channel(同步通道)

什么是无缓冲的channel?什么是有缓冲的channel?无缓冲channel的发送和接收是否同步?它们之间有什么区别?

无缓冲的channel:无缓冲的通道指的是通道大小为0,发送和接收方需要同时准备好,才可以完成发送和接收操作。(无缓冲的channel由于没有缓冲发送和接收需要同步。)

有缓冲的channel:有缓冲的通道指的是有缓冲大小大于1,不需要发送方和接收方同时准备好,都可以进行发送和接收操作。(有缓冲channel不要求发送和接收操作同步。)

区别:无缓冲的通道保证进行发送和接收的 goroutine会在同一时间进行数据交换;而有缓冲的通道只有在通道中没有要接收的值时,接收动作才会阻塞,只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。

无缓冲的通道指的是通道的大小为0,也就是说,这种类型的通道在接收前没有能力保存任何值,它要求发送 goroutine 和接收 goroutine 同时准备好,才可以完成发送和接收操作。

image-20211108204842890

从上面无缓冲的通道定义来看,发送 goroutine 和接收 gouroutine 必须是同步的,同时准备后,如果没有同时准备好的话,先执行的操作就会阻塞等待,直到另一个相对应的操作准备好为止。这种无缓冲的通道我们也称之为同步通道。

① 不可以在同一个 goroutine 中既读又写,否则将会死锁。

示例:

package main

import "fmt"

// 结果:fatal error: all goroutines are asleep - deadlock!

// 解析:不可以在同一个 goroutine 中既读又写,否则将会死锁。
func main() {
    ch := make(chan int)

    ch <- 2
    x := <-ch
    fmt.Println(x)
}

② 两个goroutine中使用无缓冲的channel,则读写互为阻塞,即双方代码的执行都会阻塞在<-chch <- 处,直到双方读写完成在 ch 中的传递,各自继续向下执行,此处借用CSP 图例说明:

image-20211101112249275

示例代码:

// 结果:
// after write
// after read: 2

// 解析:两个 goroutine 中使用无缓冲的channel,则读写互为阻塞。
// 即双方代码的执行都会阻塞在 <-ch 和 ch <- 处,直到双方读写完成在 ch 中的传递,各自继续向下执行。
func main1() {
    ch := make(chan int)

    go func() {
        ch <- 2
        fmt.Println("after write")
    }()

    x := <-ch
    fmt.Println("after read:", x)
}

有缓冲的 channel

带缓冲的channel(buffered channel) 是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine之间必须同时完成发送和接收。通道会阻塞发送和接收动作的条件也会不同。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。

这导致有缓冲的通道和无缓冲的通道之间的一个很大的不同:无缓冲的通道保证进行发送和接收的 goroutine会在同一时间进行数据交换;有缓冲的通道没有这种保证。

image-20211108204813244

make时传递第二参 capacity,即为有缓冲的 channel

ch := make(chan int, 1)

这样的 channel 无论是否在同一 goroutine 中,均可读写而不致死锁,看看下面的代码输出什么内容:

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int, 1)
    for i := 0; i < 10; i++ {
        select {
        case x := <-ch:
            fmt.Println(x) // 0 2 4 6 8
        case ch <- i:
        }
    }
}

有无缓冲 channel的演示代码如下:

// 无缓冲的 channel 由于没有缓冲发送和接收需要同步。
ch1 := make(chan int)
// 缓冲区为 3, 有缓冲 channel 不要求发送和接收操作同步。
ch2 := make(chan int, 3)
  • 无缓冲的 channel(unbuffered channel),其缓冲区大小则默认为 0。在功能上其接收者会阻塞等待并阻塞应用程序,直至收到通信和接收到数据。
  • 有缓冲的 channel(buffered channel),其缓存区大小是根据所设置的值来调整。在功能上,若缓冲区未满则不会阻塞,会源源不断的进行传输。当缓冲区满了后,发送者就会阻塞并等待。而当缓冲区为空时,接收者就会阻塞并等待,直至有新的数据。

close channel 读写数据

对已经关闭的chan进行读写会怎么样?为什么?

写已经关闭的 chanpanic。报错信息:panic:send on closed channel

读已经关闭的 chan 能一直读到东⻄,但是读到的内容根据通道内关闭前是否有元素而不同。
① 如果 chan 关闭前,buffer内有元素还未读 , 会正确读到 chan 内的值,且返回的第二个 bool值(是否读成功)为 true
② 如果 chan 关闭前,buffer内有元素已经被读完,chan 内无值,接下来所有接收的值都会非阻塞直接成功,返回 channel 元素的零值,第二个 bool 值一直为 false

package main

import "fmt"

func main() {
    fmt.Println("以下是数值的chan")
    ci := make(chan int, 3)
    ci <- 1
    close(ci)
    num, ok := <-ci
    fmt.Printf("第一次读chan的协程结束,num=%v, ok=%v\n", num, ok)
    num1, ok1 := <-ci
    fmt.Printf("第二次读chan的协程结束,num=%v, ok=%v\n", num1, ok1)
    num2, ok2 := <-ci
    fmt.Printf("第三次读chan的协程结束,num=%v, ok=%v\n", num2, ok2)
    fmt.Println()

    fmt.Println("以下是字符串chan")
    cs := make(chan string, 3)
    cs <- "aaa"
    close(cs)
    str, ok := <-cs
    fmt.Printf("第一次读chan的协程结束,str=%v, ok=%v\n", str, ok)
    str1, ok1 := <-cs
    fmt.Printf("第二次读chan的协程结束,str=%v, ok=%v\n", str1, ok1)
    str2, ok2 := <-cs
    fmt.Printf("第三次读chan的协程结束,str=%v, ok=%v\n", str2, ok2)
    fmt.Println()

    fmt.Println("以下是结构体chan")
    type MyStruct struct {
        Name string
    }
    cst := make(chan MyStruct, 3)
    cst <- MyStruct{Name: "ha"}
    close(cst)
    struct1, ok := <-cst
    fmt.Printf("第一次读chan的协程结束,struct=%v, ok=%v\n", struct1, ok)
    struct2, ok1 := <-cst
    fmt.Printf("第二次读chan的协程结束,struct=%v, ok=%v\n", struct2, ok1)
    struct3, ok2 := <-cst
    fmt.Printf("第三次读chan的协程结束,struct=%v, ok=%v\n", struct3, ok2)
}

运行结果:

以下是数值的chan
第一次读chan的协程结束,num=1, ok=true
第二次读chan的协程结束,num=0, ok=false
第三次读chan的协程结束,num=0, ok=false

以下是字符串chan
第一次读chan的协程结束,str=aaa, ok=true
第二次读chan的协程结束,str=, ok=false
第三次读chan的协程结束,str=, ok=false

以下是结构体chan
第一次读chan的协程结束,struct={ha}, ok=true
第二次读chan的协程结束,struct={}, ok=false
第三次读chan的协程结束,struct={}, ok=false

ring buffer 实现

Channelring buffer 实现?

channel的缓冲区通过ring buffer实现,同时存在两个标记sendxrecvx分别来标识写入位置和读取位置。当发生写入是sendx会加1,当达到最大位置时,sendx会回到起始位置。

hchan 中有两个与 buffer 相关的变量:recvxsendx。其中 sendx 表示 buffer 中可写的 indexrecvx 表示 buffer 中可读的 index。 从 recvxsendx 之间的元素,表示已正常存放入 buffer 中的数据。

image-20211117162805702

上图展示的是一个缓冲区为8的channel bufferrecvx指向最早被读取的数据,sendx指向再次写入时插入的位置。

goroutine 泄露

关于goroutine泄露,下面的代码有什么问题?

package main

import (
    "fmt"
    "time"
)

// 知识点:关于 goroutine 泄漏,下面代码有什么问题?
// process 函数会启动一个 goroutine,去处理需要长时间处理的业务,处理完之后,会发送 true 到 chan 中,
// 目的是通知其它等待的 goroutine,可以继续处理了。主 goroutine 接收到任务处理完成的通知,或者超时后就返回了。这段代码有问题吗?
// 如果发生超时,process 函数就返回了,这就会导致 unbuffered 的 chan 从来就没有被读取。
// unbuffered chan 必须等 reader 和 writer 都准备好了才能交流,否则就会阻塞。
// 超时导致未读,结果就是子 goroutine 就阻塞在写永远结束不了,进而导致 goroutine 泄漏。
// 解决这个 Bug 的办法就是将 unbuffered chan 改成容量为 1 的 chan,这样写就不会被阻塞了。

func process(timeout time.Duration) bool {
    // ch := make(chan bool, 1)
    ch := make(chan bool)
    go func() {
        // 模拟处理耗时的业务
        // time.Sleep((timeout + time.Second))
        ch <- true // block
        fmt.Println("exit goroutine")
    }()
    select {
    case result := <-ch:
        return result
    case <-time.After(timeout):
        return false
    }
}

func main() {
    res := process(1 * time.Second)
    fmt.Println(res)
}

相关推荐

微信扫一扫,分享到朋友圈

【Go 原理】Channel 读写特性和 CSP 思想
返回顶部

显示

忘记密码?

显示

显示

获取验证码

Close