当前位置: 首页 > article >正文

重拾GMP

目录

  • GMP总结
    • 线程
    • 协程
    • 三家对比
    • GMP调度模型
      • m
      • g
      • p
      • 过一遍流程
    • g
      • 一个G的生命周期
    • m
    • p
    • schedt全局队列
    • g0视角看看G的调度流程
      • 四大调度类型
        • 主动调度
        • 被动调度
        • 正常调度
        • 抢占调度
    • 宏观的调度流程
    • 上面流程的具体细节
      • schedule()
      • findRunnable()
      • execute()
      • gosched_m()
      • park_m()与ready()
      • goexit0()
      • retake()
      • reentersyscall 和 exitsyscall

GMP总结

线程

线程是跑在操作系统内核上的一段程序,是内核态的一段程序,是操作系统调度的基本单位。

可以举个例子,比如一个网站的程序整体就是一个线程。这个线程对于用户是不透明的

协程

协程是线程的一个子集,是一种逻辑上的概念,在《计算机操作系统》这门课中只有线程的概念,
然而在具体的工程实践中,工程师们为了方便创建了协程这一概念。

协程是一种用户态的程序,一般与线程形成N:1的关系,也就是说一个线程下面可以绑定多个协程

举个例子,一个网站中有用户管理的部分和文章管理的部分,这两个部分是等级的协程,共同
被绑定在这个网站的线程之下。(实际上可能更加复杂)。

但是这就造成了一个大问题:因为用户态的协程对于操作系统来说是不可见的,因此当多个协程中由于某个协程
阻塞导致整个线程发生阻塞时,操作系统无法找到对应的协程,其他协程也无法运行。这个问题在并发编程中
十分常见:

比如小米YU7即将上市,在抢购的网站服务器中,会启动多个协程用于接收用户的订购请求,但是当一个用户可能因为
网络问题无法访问服务器,则与这个用户的协程绑在一起的整个线程就会发生阻塞,导致整个线程上的用户集体卡顿。
这就是传统协程,线程N:1模式的问题。

为了解决这个问题GMP调度模型应运而生。

三家对比

GMP模型与协程和线程相比:

  1. 它和协程一样是一种用户态的协程。
  2. 它兼顾线程的可并行特性,并且可以应对协程阻塞并且在这个方面比线程更加优秀。
  3. 额外的,goroutine的栈相对于传统线程是可以动态扩展的,增加了栈空间的利用率。

刚才讲过,传统协程与线程的关系是N:1的,而GMP调度模型是N:M的。

  1. 而GMP调度模型是N:M
  2. 创建,销毁,调度在用户态完成,对内核透明,足够轻便。
  3. 通过调度器的斡旋,实现线程间动态绑定和灵活调度
  4. 栈空间大小动态扩展,降低了内存消耗

GMP调度模型

gmp=goroutine+machine+processor

m

m就是线程在golang中的一个抽象概念。
通常就是指线程!!!

  1. m不能和传统线程一样直接执行g,而是要先和p绑定后才能调度g。
  2. 注意,m不需要和p绑死,因此可以实现动态解绑和绑定。

g

  1. g就是goroutine(go协程)
  2. go有自己的运行栈,状态和要执行的函数
  3. g需要绑定到p才能执行,从g的视角来看,p就是g的cpu

p

  1. p是processor,就是go协程调度器,他是gmp调度模型的核心。
  2. 对于m而言,p就是一个执行代理,p要为m提供g和内存等情况,m只有和p结合才能执行调度。
  3. p的数量决定了g的数量,可以由GOMAXPROCS来设置。

在这里插入图片描述

过一遍流程:

现在假如已经创建了很多G,已经形成了一定的规模。

  1. 首先p的数量通过GOMAXPROCS来设置,设置后会创建一定数量的m,然后将m和p绑定。(m和p的数量并不是一一对应的!!
  2. 然后讲G和P绑定放到P的本地队列中。
  3. 然后M会从绑定的P的本地对立中取出G来执行。
  4. 当其中某个P的本地队列执行完的时候,会从全局队列中取G
  5. 当全局队列和本地队列都为空的时候,会从其他P的本地队列中偷取G(work stealing机制)

因为所有的P都有可能从globel queue中取G,所以需要对全局队列加锁。

因为P的本地队列之间会有work stealing机制,所以P队列之间也存在并发安全的问题,只不过出现的频率比较小,所以接近无锁化而不是真正的无锁化

g

在这里插入图片描述

  1. m 是一个指向m的指针

一个G的生命周期

  1. G被创建,状态标记为_Gidle
  2. 然后会被搁置为_Gdead
  3. 这时所有环境被准备好,状态为_Grunnable
  4. 当G正在被执行时状态变成_Grunning
  5. G在被执行的时候如果逻辑代码需要进入内核态则状态变成_Gsyscall
  6. G在被执行时遇到阻塞,状态变成_Gwaiting
  7. 然后G从_Gwaiting或者_Gsyscall状态返回_Grunnable
  8. 等待从新被执行变成_Grunning
  9. G执行完成状态标记为_Gdead,然后被销毁。
    在这里插入图片描述

在这里插入图片描述

m

在这里插入图片描述

g0又叫做始祖goroutine,用来帮助m进行g之间的切换调度,与m关系为1:1.

p

在这里插入图片描述

可以看到p的数据结构中存在队列的数据结构。

本地goroutine队列最大长度为256。

  1. runqhead指向队列头
  2. runqtail指向队列尾部
  3. runqnext指向下一个要执行的goroutine

schedt全局队列

在这里插入图片描述

明显的看到有一把互斥锁

g0视角看看G的调度流程

上文讲过,g0是众多g中最特殊的一个,它和m是1:1的关系,用于调度权的切换

他与普通G的调度关系如下:
在这里插入图片描述

func gogo:g0将调度权交由普通g执行
func mcall:普通g将调度权还给g0

四大调度类型:

  1. 主动调度

  2. 被动调度

  3. 正常调度

  4. 抢占调度
    在这里插入图片描述

    通常,调度指的是由 g0 按照特定策略找到下一个可执行 g 的过程. 而本小节谈及的调度类型是广义上的“调度”,指的是调度器 p 实现从执行一个 g 切换到另一个 g 的过程.

主动调度

主动调度就是由用户主动将调度权交给g0,g0再去寻找可以执行的下一个g0.用户通过调用runtime.Gosched()来实现,让当前g让出执行权
主动进入队列等待下次被调度。

func Gosched() {checkTimeouts()mcall(gosched_m)//交出执行权
}

被动调度

被动调度一般是指,当前g不满足某种执行条件,g可能会陷入阻塞,直到条件达成g才能从阻塞中被唤醒,重新进入执行队列等待被调度。

一般的:满的channel加入队列,或者互斥锁等操作会让底层走进gopark方法。

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte,traceskip int){//... mcall(park_m)//交出执行权
}

goready 方法通常与 gopark 方法成对出现,能够将 g 从阻塞态中恢复,重新进入等待执行的状态.

func goready(gp *g, traceskip int) {systemstack(func() {ready(gp,traceskip,true)})
}

正常调度

g 中的执行任务已完成,g0 会将当前 g 置为死亡状态,发起新一轮调度.

抢占调度

倘若 g 执行系统调用超过指定的时长(有一个全局监控器),且全局的 p 资源比较紧缺,此时将 p 和 g 解绑,抢占出来用于其他 g 的调度.
等 g 完成系统调用后,会重新进入可执行队列中等待被调度.

值得一提的是,前 3 种调度方式都由 m 下的 g0 完成,唯独抢占调度不同.

因为发起系统调用时需要打破用户态的边界进入内核态,此时 m 也会因系统调用而陷入僵直,无法主动完成抢占调度的行为.

因此,在 Golang 进程会有一个全局监控协程 monitor g 的存在,这个 g 会越过 p 直接与一个 m 进行绑定,不断轮询对所有
p 的执行状况进行监控. 倘若发现满足抢占调度的条件,则会从第三方的角度出手干预,主动发起该动作.

宏观的调度流程

在这里插入图片描述

上面图的流程:

  1. 以g0->g->g0的一轮循环举例
  2. g0 执行 schedule() 函数,寻找到用于执行的 g;
  3. g0 执行 execute() 方法,更新当前 g、p 的状态信息,并调用 gogo() 方法,将执行权交给 g;
  4. g 因主动让渡( gosche_m() )、被动调度( park_m() )、正常结束( goexit0() )等原因,调用 m_call 函数,执行权重新回到 g0 手中;
  5. g0 执行 schedule() 函数,开启新一轮循环.

上面流程的具体细节

schedule()

当执行这个函数时,执行权尚在g0手中。

func schedule() {// ... gp,inheritTime,tryWakeP:=findRunnable() //todo 寻找下一个可执行g// ...execute(gp, inheritTime)//todo 执行找到的g
}

findRunnable()

在这里插入图片描述

调度流程中,一个非常核心的步骤,就是为 m 寻找到下一个执行的 g,这部分内容位于 runtime/proc.go 的 findRunnable 方法中

func findRunnable() (gp *g, inheritTime, tryWakeP bool) {_g_ := getg()top:_p_ := _g_.m.p.ptr()// ...if _p_.schedtick%61 == 0 && sched.runqsize > 0 {//todo 每执行61次,从全局队列中获取一个glock(&sched.lock)gp = globrunqget(_p_, 1)unlock(&sched.lock)if gp != nil {return gp, false, false}}// ...if gp, inheritTime := runqget(_p_); gp != nil {return gp, inheritTime, false}// ...if sched.runqsize != 0 {lock(&sched.lock)gp := globrunqget(_p_, 0)unlock(&sched.lock)if gp != nil {return gp, false, false}}if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {if list := netpoll(0); !list.empty() { // non-blockinggp := list.pop()injectglist(&list)casgstatus(gp, _Gwaiting, _Grunnable)return gp, false, false}}// ...procs := uint32(gomaxprocs)if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {if !_g_.m.spinning {_g_.m.spinning = trueatomic.Xadd(&sched.nmspinning, 1)}gp, inheritTime, tnow, w, newWork := stealWork(now)now = tnowif gp != nil {// Successfully stole.return gp, inheritTime, false}if newWork {// There may be new timer or GC work; restart to// discover.goto top}if w != 0 && (pollUntil == 0 || w < pollUntil) {// Earlier timer to wait for.pollUntil = w}}// 

除了获取一个 g 用于执行外,还会额外将一个 g 从全局队列转移到 p 的本地队列,让全局队列中的 g 也得到更充分的执行机会.

func globrunqget(_p_ *p, max int32) *g {if sched.runqsize == 0 {return nil}n := sched.runqsize/gomaxprocs + 1if n > sched.runqsize {n = sched.runqsize}if max > 0 && n > max {n = max}if n > int32(len(_p_.runq))/2 {n = int32(len(_p_.runq)) / 2}sched.runqsize -= ngp := sched.runq.pop()n--for ; n > 0; n-- {gp1 := sched.runq.pop()runqput(_p_, gp1, false)}return gp

在这里插入图片描述

将一个 g 由全局队列转移到 p 本地队列的执行逻辑位于 runqput 方法中:

func runqput(_p_ *p, gp *g, next bool) {// ...retry:h := atomic.LoadAcq(&_p_.runqhead) //todo  取得 p 本地队列队首的索引,同时对本地队列加锁t := _p_.runqtailif t-h < uint32(len(_p_.runq)) {_p_.runq[t%uint32(len(_p_.runq))].set(gp)atomic.StoreRel(&_p_.runqtail, t+1) //todo 倘若 p 的局部队列未满,则成功转移 g,将 p 的对尾索引 runqtail 值加 1 并解锁队列.return}if runqputslow(_p_, gp, h, t) {return}// the queue is not full, now the put above must succeedgoto retry 

在这里插入图片描述

III 倘若发现本地队列 runq 已经满了,则会返回来将本地队列中一半的 g 放回全局队列中,帮助当前 p 缓解执行压力,这部分内容位于 runqputslow 方法中.

  • 尝试从p本地队列中获取一个可以执行的goroutine,核心逻辑位于 runqget 方法中:
func runqget(_p_ *p) (gp *g, inheritTime bool) {if next != 0 && _p_.runnext.cas(next, 0) {return next.ptr(), true}for {h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumerst := _p_.runqtailif t == h {return nil, false}gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consumereturn gp, false}}

execute()

在这里插入图片描述

当 g0 为 m 寻找到可执行的 g 之后,接下来就开始执行 g. 这部分内容位于 runtime/proc.go 的 execute 方法中:

func execute(gp *g, inheritTime bool) {_g_ := getg()_g_.m.curg = gpgp.m = _g_.mcasgstatus(gp, _Grunnable, _Grunning)// todo 更新G的状态信息gp.waitsince = 0gp.preempt = falsegp.stackguard0 = gp.stack.lo + _StackGuardif !inheritTime {_g_.m.p.ptr().schedtick++//  todo 更新 p 的调度计数器}gogo(&gp.sched)// todo 跳转到 gp 的调度地址,开始执行 g.

gosched_m()

g 执行主动让渡时,会调用 mcall 方法将执行权归还给 g0,并由 g0 调用 gosched_m 方法,位于 runtime/proc.go 文件中:
在这里插入图片描述

func gosched_m(gp *g) {goschedImpl(gp)
}func goschedImpl(gp *g) {status := readgstatus(gp)if status&^_Gscan != _Grunning {dumpgstatus(gp)throw("bad g status")}casgstatus(gp, _Grunning, _Grunnable)// todo 更新 g 的状态信息为_Grunnabledropg()// todo 将当前g与m解绑lock(&sched.lock)globrunqput(gp)// todo 将 g 放入全局队列中unlock(&sched.lock)schedule()// todo 开始新一轮调度

park_m()与ready()

g 需要被动调度时,会调用 mcall 方法切换至 g0,并调用 park_m 方法将 g 置为阻塞态,执行流程位于 runtime/proc.go 的 gopark 方法当中:

在这里插入图片描述

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {// ...mcall(park_m)// todo 将当前 g 与 m 解绑,并调用 park_m 方法
}
func park_m(gp *g) {_g_ := getg()casgstatus(gp, _Grunning, _Gwaiting)// todo 更新 g 的状态信息为_Gwaitingdropg()// todo 将当前g与m解绑// ...schedule()// todo 开始一轮调度

当因被动调度陷入阻塞态的 g 需要被唤醒时,会由其他协程执行 goready 方法将 g 重新置为可执行的状态,方法位于 runtime/proc.go .

被动调度如果需要唤醒,则会其他 g 负责将 g 的状态由 waiting 改为 runnable,然后会将其添加到唤醒者的 p 的本地队列中:

func goready(gp *g, traceskip int) {systemstack(func() {ready(gp, traceskip, true)})
}
func ready(gp *g, traceskip int, next bool) {// ..._g_ := getg()// ...casgstatus(gp, _Gwaiting, _Grunnable)// todo 更新 g 的状态信息为_Grunnablerunqput(_g_.m.p.ptr(), gp, next)// todo 调用 runqput 将当前 g 添加到唤醒者 p 的本地队列中,如果队列满了,会连带 g 一起将一半的元素转移到全局队列.// ...
}

goexit0()

在这里插入图片描述

当 g 执行完成时,会先执行 mcall 方法切换至 g0,然后调用 goexit0 方法,内容为 runtime/proc.go:

// Finishes execution of the current goroutine.
func goexit1() {// ...mcall(goexit0)
}
func goexit0(gp *g) {_g_ := getg()_p_ := _g_.m.p.ptr()casgstatus(gp, _Grunning, _Gdead)// todo 更新 g 的状态信息为_Gdead// ...gp.m = nil// ...dropg()// todo 将当前g与m解绑// ...schedule()// todo 开始一轮调度

retake()

在这里插入图片描述

与 4.7-4.9 小节的区别在于,抢占调度的执行者不是 g0,而是一个全局的 monitor g,代码位于 runtime/proc.go 的 retake 方法中:

func retake(now int64) uint32 {n := 0lock(&allpLock)// todo 因为是全局队列,所以需要加锁//加锁后,遍历全局的 p 队列,寻找需要被抢占的目标:for i := 0; i < len(allp); i++ {_p_ := allp[i]if _p_ == nil {// This can happen if procresize has grown// allp but not yet created new Ps.continue}pd := &_p_.sysmontick// 倘若某个 p 同时满足下述条件,则会进行抢占调度:if s == _Psyscall {            // ...if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {//执行系统调用超过 10 ms//p 本地队列有等待执行的 g//或者当前没有空闲的 p 和 mcontinue}unlock(&allpLock)if atomic.Cas(&_p_.status, s, _Pidle) {n++_p_.syscalltick++handoffp(_p_)}incidlelocked(1)lock(&allpLock)}}unlock(&allpLock)return uint32(n)
}
  1. 加锁后,遍历全局的p队列,寻找需要被抢占的目标:
    lock(&allpLock)for i := 0; i < len(allp); i++ {_p_ := allp[i]// ...}unlock(&allpLock)
  1. 倘若某个 p 同时满足下述条件,则会进行抢占调度:
  • I 执行系统调用超过 10 ms;
  • II p 本地队列有等待执行的 g;
  • III 或者当前没有空闲的 p 和 m.
  1. 抢占调度的步骤是,先将当前 p 的状态更新为 idle,然后步入 handoffp 方法中,判断是否需要为 p 寻找接管的 m(因为其原本绑定的 m 正在执行系统调用):
     if atomic.Cas(&_p_.status, s, _Pidle) {n++_p_.syscalltick++handoffp(_p_)}
  1. 当以下四个条件满足其一时,则需要为 p 获取新的 m:
  • I 当前 p 本地队列还有待执行的 g;
  • II 全局繁忙(没有空闲的 p 和 m,全局 g 队列为空)
  • III 需要处理网络 socket 读写请求
func handoffp(_p_ *p) {if !runqempty(_p_) || sched.runqsize != 0 {startm(_p_, false)return}if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) {startm(_p_, true)return}lock(&sched.lock)// ...if sched.runqsize != 0 {unlock(&sched.lock)startm(_p_, false)return}// If this is the last running P and nobody is polling network,// need to wakeup another M to poll network.if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {unlock(&sched.lock)startm(_p_, false)return}// ...
  1. 获取 m 时,会先尝试获取已有的空闲的 m,若不存在,则会创建一个新的 m
func startm(_p_ *p, spinning bool) {mp := acquirem()lock(&sched.lock)// ...nmp := mget()if nmp == nil {id := mReserveID()unlock(&sched.lock)var fn func()// ...newm(fn, _p_, id)// ...return}unlock(&sched.lock)// ...
}

reentersyscall 和 exitsyscall

在这里插入图片描述在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

func exitsyscall0(gp *g) {casgstatus(gp, _Gsyscall, _Grunnable)dropg()lock(&sched.lock)var _p_ *pif schedEnabled(gp) {_p_, _ = pidleget(0)}var locked boolif _p_ == nil {globrunqput(gp)} unlock(&sched.lock)if _p_ != nil {acquirep(_p_)execute(gp, false) // Never returns.}// ...stopm()schedule() // Never returns.
}

在这里插入图片描述
在这里插入图片描述

func exitsyscall0(gp *g) {casgstatus(gp, _Gsyscall, _Grunnable)dropg()lock(&sched.lock)var _p_ *pif schedEnabled(gp) {_p_, _ = pidleget(0)}var locked boolif _p_ == nil {globrunqput(gp)} unlock(&sched.lock)if _p_ != nil {acquirep(_p_)execute(gp, false) // Never returns.}// ...stopm()schedule() // Never returns.
}
http://www.lryc.cn/news/2382141.html

相关文章:

  • 实验分享|基于千眼狼sCMOS科学相机的流式细胞仪细胞核成像实验
  • C++学习:六个月从基础到就业——C++11/14:其他语言特性
  • 【Linux笔记】——线程池项目与线程安全单例模式
  • 数据驱动的社会舆情监测与分析——用算法洞察世界脉动
  • OD 算法题 B卷 【最佳植树距离】
  • ZooKeeper 原理解析及优劣比较
  • 实战5:个性化数字艺术生成与销售
  • 是德科技 | 单通道448G未来之路:PAM4? PAM6? PAM8?
  • OceanBase 开发者大会,拥抱 Data*AI 战略,构建 AI 数据底座
  • STM32IIC协议基础及Cube配置
  • CNN vs ViT:图像世界的范式演进
  • cocos creator使用jenkins打包微信小游戏,自动上传资源到cdn,windows版运行jenkins
  • 定时器的两种实现方式
  • Python、Pytorch、TensorFlow、Anconda、PySide、Jupyter
  • [Java实战]Spring Boot整合MinIO:分布式文件存储与管理实战(三十)
  • MacBook Air A2179(Intel版)安装macOS Catalina所需时间
  • AI在人力资源领域的应用:把握时代浪潮
  • 【VxWorks 实时操作系统(RTOS)】常用函数汇总
  • vr制作公司提供什么服务?
  • 下一代电子电气架构(EEA)的关键技术
  • matlab慕课学习3.5
  • 大语言模型(LLM)如何通过“思考时间”(即推理时的计算资源)提升推理能力
  • Ollama 如何在显存资源有限的情况下合理分配给不同的服务?
  • Qt音视频开发过程中一个疑难杂症的解决方法/ffmpeg中采集本地音频设备无法触发超时回调
  • 基于注意力机制与iRMB模块的YOLOv11改进模型—高效轻量目标检测新范式
  • PEFT库PromptTuningConfig 配置
  • 操作系统----软考中级软件工程师(自用学习笔记)
  • SQL 多表关联与分组聚合:解密答题正确率分析
  • 基于 Redis 实现短信验证码登录功能的完整方案
  • 电平匹配电路