【Go 原理】Goroutine 及调度源码分析

Goroutine

goroutine的理解

谈谈你对goroutine的理解?

goroutine是来自协程coroutine的概念,它属于用户态的线程,主要解决操作(内核)系统线程占用内存太大和创建、切换开销性能消耗较大的问题。用户态线程goroutine是一个非常轻量级的,其创建和切换都在用户代码中完成而无需进入操作系统内核,所以其开销要远远小于系统线程的创建和切换;另外一个优势在于goroutine只占2-4KB内存空间,可以在程序轻易的创建成千上万甚至上百万的goroutine出来并发的执行任务而不用太担心性能和内存等问题。其他程序如C/JAVA的多线程,往往是内核态的,比较重量级,几千个线程可能就会耗光CPU

Go为了提供更容易使用的并发方法,使用了goroutinechannelgoroutine来自协程的概念,让一组可复用的函数运行在一组线程之上,即使有协程阻塞,该线程的其他协程也可以被runtime调度,转移到其他可运行的线程上(hand off机制)。

Go中,协程被称为goroutine,它非常轻量,一个goroutine只占几KB,并且这几KB就足够goroutine运行完,这就能在有限的内存空间内支持大量goroutine,支持了更多的并发。虽然一个goroutine的栈只占几KB,但实际是可伸缩的,如果需要更多内容,runtime会自动为goroutine分配。

goroutine特点:

  • 占用内存更小(几KB)。
  • 调度更灵活(runtime调度)。

goroutine是 Go 语言实现的轻量级的用户态线程,主要用来解决操作系统线程太重的问题,所谓的太重,主要表现在以下两个方面:

  • 创建和切换太重:操作系统线程的创建和切换都需要进入内核,而进入内核所消耗的性能代价比较高,开销较大;
  • 内存使用太重:一方面,为了尽量避免极端情况下操作系统线程栈的溢出,内核在创建操作系统线程时默认会为其分配一个较大的栈内存(虚拟地址空间,内核并不会一开始就分配这么多的物理内存),然而在绝大多数情况下,系统线程远远用不了这么多内存,这导致了浪费;另一方面,栈内存空间一旦创建和初始化完成之后 其大小就不能再有变化,这决定了在某些特殊场景下系统线程栈还是有溢出的⻛险。

而相对的,用户态线程goroutine则轻量得多:

  • goroutine是用户态线程,其创建和切换都在用户代码中完成而无需进入操作系统内核,所以其开销要远远小于系统线程的创建和切换;
  • goroutine启动时默认栈大小只有2k,这在多数情况下已经够用了,即使不够用,goroutine的栈也会自动扩大,同时,如果栈太大了过于浪费它还能自动收缩,这样既没有栈溢出的⻛险,也不会造成栈内存空间的大量浪费。

正是因为Go语言中实现了如此轻量级的线程(逻辑态的),才使得我们在Go程序中,可以轻易的创建成千上万甚至上百万的goroutine出来并发的执行任务而不用太担心性能和内存等问题。其他程序如C/JAVA的多线程,往往是内核态的,比较重量级,几千个线程可能就会耗光CPU。

以下是 Rob PikeGoogle I/O 2012 上对goroutine给出的描述:

What is a goroutine? It’s an independently executing function, launched by a go statement.
It has its own call stack, which grows and shrinks as required.
It’s very cheap. It’s practical to have thousands, even hundreds of thousands of goroutines.
It’s not a thread.
There might be only one thread in a program with thousands of goroutines.
Instead, goroutines are multiplexed dynamically onto threads as needed to keep all the goroutines running.
But if you think of it as a very cheap thread, you won’t be far off.

― Rob Pike

概括下来其实就一句话:

goroutine 可以视为开销很小的线程(既不是物理线程也不是协程,但它拥有自己的调用栈,并且这个栈的大小是可伸缩的 不是协程,它有自己的栈),很好用,需要并发的地方就用 go 起一个 func。

Golang 中,任何代码都是运行在 goroutine里,即便没有显式的 go func(),默认的 main 函数也是一个 goroutine。但 goroutine 不等于操作系统的线程,它与系统线程的对应关系,牵涉到Golang 运行时的调度器。

goroutine 调度器

什么是 M:N 两级线程模型?什么是goroutine调度器?

M:N 两级线程模型其实是用户态线程(goroutine)和操作系统线程之间的映射关系。

具体理解为,M个goroutine运行在N个操作系统线程之上,内核负责对这N个操作系统线程进行调度,而这N个系统线程又负责对这M个goroutine进行调度和运行。

所谓的goroutine调度器,其实可以理解为GMP模型中的P。它是指程序代码按照一定的算法在适当的时候挑选出合适的goroutine并放到CPU上去运行的过程,这些负责对goroutine进行调度的程序代码我们称之为goroutine调度器。

goroutine是建立在操作系统线程基础之上的用户态线程,它与操作系统线程之间实现了一个多对多(M:N)的两级线程模型。

image-20211028222830640

这里的 M:N 是指M个goroutine运行在N个操作系统线程之上,内核负责对这N个操作系统线程进行调度,而这N个系统线程又负责对这M个goroutine进行调度和运行。

所谓的goroutine调度,是指程序代码按照一定的算法在适当的时候挑选出合适的goroutine并放到CPU上去运行的过,这些负责对goroutine进行调度的程序代码我们称之为goroutine调度器。

goroutine调度器需要解决三大核心问题:

  • 调度时机:什么时候会发生调度?

  • 调度策略:使用什么策略来挑选下一个进入运行的goroutine

  • 切换机制:如何把挑选出来的goroutine放到CPU上运行?

为了帮助我们从宏观上了解goroutine的两级调度模型,简化后goroutine调度器的工作流程伪代码:

// 程序启动时的初始化代码
......
for i := 0; i < N; i++ { // 创建N个操作系统线程(工作线程)执行 schedule 函数
    create_os_thread(schedule) // 创建一个操作系统线程执行 schedule 函数 
}
// schedule 函数实现调度逻辑 
func schedule() {
    for { // 调度循环
        // 根据某种算法从M个 goroutine 中找出一个需要运行的 goroutine
        g := find_a_runnable_goroutine_from_M_goroutines()
        run_g(g) // CPU运行该 goroutine,直到需要调度其它 goroutine 才返回 
        save_status_of_g(g) // 保存 goroutine 的状态,主要是寄存器的值
    } 
}

程序运行起来之后创建了N个由内核调度的操作系统线程 (工作线程)去执行shedule函数。

schedule函数在一个调度循环中反复从M个goroutine中挑选出一个需要运行的goroutine并跳转到该

goroutine去运行,直到需要调度其它goroutine时才返回到schedule函数中。通过 save_status_of_g保存刚刚正在运行的 goroutine 的状态,然后再次去寻找下一个 goroutine

goroutine 的调度策略

关于goroutine的调度策略,当执行代码go func()时都经历了哪些过程?

调度器的设计策略包含以下几个要点:

  • 复用线程work stealinghand off机制):避免频繁的创建、销毁线程,而是通过对线程的复用。方式一、通过work stealing机制,当本线程无可运行的G时,尝试从其他线程绑定的P偷取G,而不是销毁线程。方式二 ,通过hand off机制, 当本线程因为G进行系统调用阻塞时,线程释放绑定的P,把P转移给其他空闲的线程执行。
  • 利用并行GOMAXPROCS设置P的数量,最多有GOMAXPROCS个线程分布在多个CPU上同时运行。GOMAXPROCS也限制了并发的程度,比如GOMAXPROCS = 核数/2,则最多利用了一半的CPU核进行并行。
  • 抢占:在coroutine中要等待一个协程主动让出CPU才执行下一个协程,在Go中,一个goroutine最多占用CPU 10ms,防止其他goroutine被饿死,这就是goroutine不同于coroutine的一个地方。
  • 全局G队列:在新的调度器中依然有全局G队列,但功能已经被弱化了,当M执行work stealing从其他P偷不到G时,它可以从全局G队列获取G。

image-20211105103418528

具体执行流程如下:

1、通过 go func()来创建一个goroutine

2、有两个存储G的队列,一个是局部调度器P的本地队列、一个是全局G队列。新创建的G会先保存在P的本地队列中,如果P的本地队列已经满了就会保存在全局的队列中;

3、G只能运行在M中,一个M必须持有一个P,M与P是1:1的关系。M会从P的本地队列弹出一个可执行状态的G来执行,如果P的本地队列为空;就会从全局队列中获取G来执行,如果全局队列中的G为空;就会想其他的MP组合偷取一个可执行的G来执行;

4、一个M调度G执行的过程是一个循环机制;

5、当M执行某一个G时候如果发生了syscall或其余阻塞操作,M会阻塞,如果当前有一些G在执行行,runtime会把这个线程M从P中摘除(detach),然后再创建一个新的操作系统的线程(如果有空闲的线程可用就复用空闲线程)来服务于这个P;

6、当M系统调用结束时候,这个G会尝试获取一个空闲的P执行,并放入到这个P的本地队列。如果获取不到P,那么这个线程M变成休眠状态, 加入到空闲线程中,然后这个G会被放入全局队列中。

image-20211101122357165

image-20211028225032397

schedule函数分三步分别从各运行队列中寻找可运行的goroutine

  • ① 从本地运行队列中寻找goroutine
  • ② 从全局运行队列中寻找goroutine
  • ③ 从其它运行线程的队列中偷取goroutine

schedule函数源码分析(部分)runtime/proc.go

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    _g_ := getg() // _g_ = m.g0
    ......  
    var gp *g
    ......

    if gp == nil {
        // Check the global runnable queue once in a while to ensure fairness.
        // Otherwise two goroutines can completely occupy the local runqueue
        // by constantly respawning each other.
    // 为保证调度的公平性,每个工作线程每经过61次调度就优先尝试从全局运行队列中找出一个 goroutine 来运行,
    // 这样才能保证位于全局运行队列中的 goroutine 得到调度的机会。
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
      // 全局运行队列是所有工作线程都可以访问的,所以在访问它之前需要加锁。
            lock(&sched.lock)
      // ① 从全局运行队列中寻找 goroutine。
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }

    if gp == nil {
    // ② 从工作线程本地运行队列中寻找 goroutine。
        gp, inheritTime = runqget(_g_.m.p.ptr())
        // We can see gp != nil here even if the M is spinning,
        // if checkTimers added a local goroutine via goready.
    }

    if gp == nil {
    // ③ 从其它工作线程的运行队列中偷取 goroutine。
        gp, inheritTime = findrunnable() // blocks until work is available
    }
  .....
  // 当前运行的是 runtime 的代码,函数调用栈使用的是 g0 的栈空间
  // 调用 execte 切换到 gp 的代码和栈空间去运行
    execute(gp, inheritTime)
}

① 从本地运行的队列寻找

runqget函数源码分析,runtime/proc.go

type guintptr uintptr

type p struct {
    // Queue of runnable goroutines. Accessed without lock.
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr
    // runnext, if non-nil, is a runnable G that was ready'd by
    // the current G and should be run next instead of what's in
    // runq if there's time remaining in the running G's time
    // slice. It will inherit the time left in the current time
    // slice. If a set of goroutines is locked in a
    // communicate-and-wait pattern, this schedules that set as a
    // unit and eliminates the (potentially large) scheduling
    // latency that otherwise arises from adding the ready'd
    // goroutines to the end of the run queue.
    runnext guintptr
}

// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
    // If there's a runnext, it's the next G to run.
  // 从 runnext 成员中获取 goroutine
    for {
    // 查看 runnext 成员是否为空,不为空则返回该 goroutine。
        next := _p_.runnext
        if next == 0 {
            break
        }
        if _p_.runnext.cas(next, 0) {
            return next.ptr(), true
        }
    }

  // 从循环队列中获取 goroutine
    for {
    // ① 原子读取,不管代码运行在哪种平台,保证在读取过程中不会有其它线程对该变量进行写入;
    // ② 位于 atomic.LoadAcq 之后的代码,对内存的读取和写入必须在 atomic.LoadAcq 读取完成后才能执行,
    // 编译器和 CPU 都不能打乱这个顺序。
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := _p_.runqtail
        if t == h {
            return nil, false
        }
        gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
    // ① 原子的执行比较并交换的操作;
    // ② 位于 atomic.CasRel 之前的代码,对内存的读取和写入必须在 atomic.CasRel 对内存的写入之前完成,
    // 编译器和 CPU 都不能打乱这个顺序。
        if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
            return gp, false
        }
    }
}

② 从全局运行队列寻找

globrunqget函数源码分析,runtime/proc.go

var (
  gomaxprocs int32
    sched      schedt
)

type schedt struct {
    // Global runnable queue.
    runq     gQueue
    runqsize int32
}

// Try get a batch of G's from the global runnable queue.
// Sched must be locked.
func globrunqget(_p_ *p, max int32) *g {
  // 全局运行队列为空。
    if sched.runqsize == 0 {
        return nil
    }

  // 计算全局运行队列中 goroutine 的数量。
  // 注意:应该从全局运行队列中拿走多少个 goroutine 时根据 p 的数量(gomaxprocs)做了负载均衡。
    n := sched.runqsize/gomaxprocs + 1
  // 计算n的方法可能导致n大于全局运行队列中的 goroutine 数量。
    if n > sched.runqsize {
        n = sched.runqsize
    }
  // 最多取函数参数 max 个 goroutine。
    if max > 0 && n > max {
        n = max
    }
  // 最多只能取本地队列容量的一半
    if n > int32(len(_p_.runq))/2 {
        n = int32(len(_p_.runq)) / 2
    }

  // 剩余全局队列个数计算
    sched.runqsize -= n

  // 先直接通过函数返回 一个 gp(pop 从全局运行队列的队列头取)
    gp := sched.runq.pop()
    n--
    for ; n > 0; n-- {
    // pop 从全局运行队列的队列头取
        gp1 := sched.runq.pop()
     // 其它的 goroutines 通过 runqput 放入本地运行队列
        runqput(_p_, gp1, false)
    }
    return gp
}

③ 从其他线程运行的队列中偷取

findrunnable函数源码分析,runtime/proc.go

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
    _g_ := getg()
  ......
  // ① 先从本地运行的队列中获取 goroutine
  // local runq
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

  // ② 再从全局运行的队列中获取 goroutine
    // global runq
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }
  ......
  for i := 0; i < 4; i++ {
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            ......
      // ③ 从其他线程运行的队列中偷取 goroutine
            if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
                return gp, false
            }
      ......
}

相关推荐

微信扫一扫,分享到朋友圈

【Go 原理】Goroutine 及调度源码分析
返回顶部

显示

忘记密码?

显示

显示

获取验证码

Close