errgroup 原理简析
golang.org/x/sync/errgroup
errgroup提供了一组并行任务中错误采集的方案。
先看注释
Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.
Group 结构体
// A Group is a collection of goroutines working on subtasks that are part of the same overall task.// A zero Group is valid and does not cancel on error.
type Group struct {cancel func() // 内部使用的结束方法wg sync.WaitGroup // 内嵌sync.WaitGroup,用来阻塞errOnce sync.Once // 只采集一次errorerr error // 采集到的error
}
WithContext 返回 *Group 与 context.Context
context.Context 会在子任务发生错误,或Wait方法结束阻塞时被取消
// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {ctx, cancel := context.WithCancel(ctx) // 创建可以取消的contextreturn &Group{cancel: cancel}, ctx // 创建并返回句柄
}
Wait 作为阻塞屏障,与WaitGroup的Wai方法作用一样
// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {g.wg.Wait() // 阻塞if g.cancel != nil { // 阻塞结束后,cancel掉整个子Context链,Wait结束阻塞g.cancel()}return g.err // 返回收集的error
}
Go 创建新的协程去执行子任务
// Go calls the given function in a new goroutine.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {g.wg.Add(1) // 令牌+1go func() {defer g.wg.Done() // 方法执行结束后令牌-1, 令牌为0的时候WaitGroup的Wait方法结束阻塞if err := f(); err != nil { //执行传入的f()方法,并检测errg.errOnce.Do(func() { // 如果err不为空,则将err赋值给g.err,并且只赋值一次g.err = errif g.cancel != nil { //如果cancel非空,则执行该方法,通知Context链,做取消操作。g.cancel()}})}}()
}
原理简析
eg, ctx := errgroup.WithContext(context.Background()) // ctx还可以作为父ctx传递给其他函数调用eg.Go(func() error {return // ...})eg.Go(func() error {return // ...})if err := eg.Wait(); err != nil {// ...}
errgroup是通过封装WaitGroup,Context,sync.Once来实现的。它利用了WaitGroup的Add,Done,Wait方法实现阻塞屏障。使用context.WithCancel来实现取消策略,取消策略针对WithContext传递出的ctx。使用sync.Once实现只保存一组任务中第一次出现的error。
errgroup.Wait结束阻塞的时机:wg sync.WaitGroup的令牌归零。
ctx 传播信号的几个时机:
- 一组任务中任何一个子任务产生error,执行了cancel。
- 一组任务顺利执行结束,Wait中执行了cancel。
- 外部传入的context为可取消的context。外层调用了cancel方法。
c, cancel := context.WithCancel(context.Background())eg, ctx := errgroup.WithContext(c)go func() {select {case <-ctx.Done():fmt.Println("结束")return}}()eg.Go(func() error {time.Sleep(time.Second * 10)return nil})eg.Go(func() error {time.Sleep(time.Second * 10)return nil})cancel()if err := eg.Wait(); err != nil {fmt.Println(err)}