当前位置: 首页 > news >正文

singlefligt使用方法和源码解读

singlefligt使用方法和源码解读

介绍

  • sync.once保证其整个生命周期内只调用一次;而singleflight则可以保证在一定范围内其只调用一次。

背景|使用场景

  • 应对缓存击穿:加锁可以解决这个问题,但是加锁不太灵活(不能控制访问频率之类的),singlefilght可以通过定时清除的方式限制频率
  • 去除重复请求:当一定时间范围内存在了大量的重复请求,可以考虑使用:一致性hash负载均衡+singlefilght收束请求。用户根据key使用一致性hash请求到特定的服务机器上,服务对请求执行singlefilght后,再去请求下游,以此收束重复请求

用法

基础用法:合并请求,合并第一个请求执行过程中到达的所有请求。

从下面的代码和输出可以看出每次的输出query...​都是基本上每10ms一次,而其中穿插了很多次打印结果,代表着在函数真正执行过程中所有的请求都是被拦截住了,当函数执行完时,会将所有的​**结果共享给这个期间到来的所有请求**。

package mainimport ("fmt""golang.org/x/sync/singleflight""log""math/rand""time"
)func getData(id int64) string {log.Printf("query...%d\n", id)time.Sleep(10 * time.Millisecond) // 模拟一个比较耗时的操作,10msreturn "liwenzhou.com" + fmt.Sprintf("%d", id)
}func main() {log.SetFlags(log.Lmicroseconds)g := new(singleflight.Group)var i int64 = 0for {time.Sleep(time.Duration(rand.Intn(5)) * time.Millisecond)// 调用go func() {icopy := ii++v1, _, shared := g.Do("getData", func() (interface{}, error) {ret := getData(icopy)return ret, nil})log.Printf("call: v1:%v, shared:%v , i:%d\n", v1, shared, i)}()}}/**
query...1
1st call: v1:liwenzhou.com1, shared:true
2nd call: v2:liwenzhou.com1, shared:true
*/
➜  test21 git:(main)go run main.go
20:11:31.995346 query...0
20:11:32.005800 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.005799 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.005804 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.005807 call: v1:liwenzhou.com0, shared:true , i:4
20:11:32.006386 query...4
20:11:32.016671 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016687 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016691 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016693 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.016694 call: v1:liwenzhou.com4, shared:true , i:9
20:11:32.017366 query...9
20:11:32.027418 call: v1:liwenzhou.com9, shared:true , i:16
20:11:32.027433 call: v1:liwenzhou.com9, shared:true , i:16
20:11:32.027436 call: v1:liwenzhou.com9, shared:true , i:16
20:11:32.027437 call: v1:liwenzhou.com9, shared:true , i:16
进阶用法1:超时控制DoChan

在基础方法中,在某一次请求执行过程中,所有到来的新的请求都会阻塞等待这个请求的执行结果。如果真正执行的过程超时出错了,其他并发的请求就只能等待。

考虑到singleflight库通常用于避免缓存击穿,需要查询外部数据库,这样的出错、超时的场景是必须要考虑的。

比如正常是10ms,但是超时时间设置的是50ms。由于并发数量并不需要真正为1,因此想12ms就停止阻塞。

// 使用DoChan进行超时控制 
func CtrTimeout(ctx context.Context, req interface{}){ch := g.DoChan(key, func() (interface{}, error) {return call(ctx, req)})select {case <-time.After(500 * time.Millisecond): returncase <-ctx.Done()returncase ret := <-ch: go handle(ret)}
}
进阶用法2:手动合并请求频率控制

在基础用法中,请求合并的频率是 一个请求的执行时间 ,希望达到自己控制合并的时间,不限制为一个请求的执行时间。

方法:另起一个协程删除对应的key​,一般是在go.Do中另起Forget​一次即可。

// 另外启用协程定时删除key,提高请求下游次数,提高成功率
func CtrRate(ctx context.Context, req interface{}){res, _, shared := g.Do(key, func() (interface{}, error) {// 另外其一个goroutine,等待一段时间后,删除key// 删除key后的调用,会重新执行Dogo func() {time.Sleep(10 * time.Millisecond)g.Forget(key)}()return call(ctx, req)})handle(res)
}

总结

singlefligt可以合并多个请求达到限频率的目的。可以使用DoChan​方法或Forget​来手动控制请求的频率和超时返回。

源码解读

singleflight的源码就一个文件,因此就放在备注里了:


// call is an in-flight or completed singleflight.Do call
type call struct {wg sync.WaitGroup //并发调用的的时候,一个协程执行其它协程阻塞,用于执行完之后通知其他阻塞的协程// These fields are written once before the WaitGroup is done// and are only read after the WaitGroup is done.val interface{} //保存执行的结果值err error //保存执行过程中的错误,只会写入一次// These fields are read and written with the singleflight// mutex held before the WaitGroup is done, and are read but// not written after the WaitGroup is done.dups  int //记录并发数量,用于执行后返回时候共享的结果(Shared)chans []chan<- Result //用于支持结果通过channel返回出去
}// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group struct {mu sync.Mutex       // protects m 保证m的并发安全m  map[string]*call // lazily initialized //m代表任务,一个key-val代表一个任务正在执行// 任务执行完之后会从map里面被删除
}// Result holds the results of Do, so they can be passed
// on a channel.
// 保存结果的结构体,这样才能支持通过channel返回结果
type Result struct { Val    interface{}Err    errorShared bool //是否和其他协程共享的结果
}// Do executes and returns the results of the given function, making
// sure that only one execution is in-flight for a given key at a
// time. If a duplicate comes in, the duplicate caller waits for the
// original to complete and receives the same results.
// The return value shared indicates whether v was given to multiple callers.
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {g.mu.Lock()if g.m == nil {g.m = make(map[string]*call)}if c, ok := g.m[key]; ok {c.dups++g.mu.Unlock()c.wg.Wait()if e, ok := c.err.(*panicError); ok {panic(e)} else if c.err == errGoexit {runtime.Goexit()}return c.val, c.err, true}c := new(call)c.wg.Add(1)g.m[key] = cg.mu.Unlock()g.doCall(c, key, fn)return c.val, c.err, c.dups > 0
}// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
//
// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {ch := make(chan Result, 1)g.mu.Lock()if g.m == nil {g.m = make(map[string]*call)}if c, ok := g.m[key]; ok {c.dups++c.chans = append(c.chans, ch)g.mu.Unlock()return ch}c := &call{chans: []chan<- Result{ch}}c.wg.Add(1)g.m[key] = cg.mu.Unlock()go g.doCall(c, key, fn)return ch
}// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {normalReturn := falserecovered := false// use double-defer to distinguish panic from runtime.Goexit,// more details see https://golang.org/cl/134395// 这块设计非常有意思,主要是针对两种异常退出的情况使用了双重defer来保证正确处理,具体见下面注释defer func() {// the given function invoked runtime.Goexitif !normalReturn && !recovered { //结合下面代码分析,一定是出现了exitc.err = errGoexit}g.mu.Lock()defer g.mu.Unlock()c.wg.Done() //通知其他协程可以拿结果了if g.m[key] == c { //删掉任务delete(g.m, key)}if e, ok := c.err.(*panicError); ok {// In order to prevent the waiting channels from being blocked forever,// needs to ensure that this panic cannot be recovered.if len(c.chans) > 0 {go panic(e)select {} // Keep this goroutine around so that it will appear in the crash dump.} else {panic(e)}} else if c.err == errGoexit {// Already in the process of goexit, no need to call again} else {// Normal returnfor _, ch := range c.chans {ch <- Result{c.val, c.err, c.dups > 0}}}}()func() {defer func() {if !normalReturn {//到这里说明:要么发生panic,要么发生exit// Ideally, we would wait to take a stack trace until we've determined// whether this is a panic or a runtime.Goexit.//// Unfortunately, the only way we can distinguish the two is to see// whether the recover stopped the goroutine from terminating, and by// the time we know that, the part of the stack trace relevant to the// panic has been discarded.if r := recover(); r != nil { //panic就赋值c.err = newPanicError(r)}}}()c.val, c.err = fn()normalReturn = true //到这里说明:没有发生panic,任务里面没有go exit}()//执行不到这里的话说明是exitif !normalReturn { //要么发生panic,要么发生exit    recovered = true  //结合分析------> 一定是panic,所以赋值}
}// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {g.mu.Lock()delete(g.m, key)g.mu.Unlock()
}

双重defer来确定是exit​还是发生了panic。
这块设计非常有意思,主要是针对两种异常退出的情况使用了双重defer来保证

总结

singleflight使用map​来隔离不同的任务,map​的key存在性标识任务是否在执行(执行完会马上从map中删除)。

对于一个任务,使用call结构体进行管理,其主要用于:控制并发(waitGroup​),执行,和传递结果(支持channel​传递)。

执行中很有意思的一点是使用了双重defer来判断异常退出是panic退出还是调用了exit()退出

http://www.lryc.cn/news/572003.html

相关文章:

  • MySQL 索引和select优化
  • ​​网络工程师知识点精讲与例题解析:数据链路层技术​​
  • 计算机视觉课程总结
  • 【Node.js 的底层实现机制】从事件驱动到异步 I/O
  • Python Peewee库连接和操作MySQL数据库
  • 条件向量运算与三元表达式
  • C语言——枚举
  • 解决Matplotlib三维图无法旋转的问题
  • AndroidR平台ToastPresenter引出BinderProxy泄漏
  • 实战指南:用DataHub管理Hive元数据
  • SkyWalking 部署与应用(Windows)
  • 7-4 身份证号处理
  • 企业班车出行服务系统的SDK选型、核心功能优化迭代的避坑复盘
  • Android软件适配遥控器需求-案例经验分享
  • WebRTC(六):ICE协议
  • 汇编语言期末快速过手笔记
  • React Native WebView键盘难题:如何让输入框不被键盘遮挡?
  • Alpha WORLD上线在即:首发AIOT,重塑项目价值格局
  • 48-Oracle CDB下的SID-实例名-服务名
  • Transformer-BiGRU、Transformer、CNN-BiGRU、BiGRU、CNN五模型多变量时序预测
  • 【计算机常识】--docker入门+docker desktop的使用(一)
  • MySQL 多表查询、事务
  • 如何使用ChatGPT快速完成一篇论文初稿?
  • Controller Area Network (CAN) 通信机制简介
  • Ubuntu服务器启动jupyter notebook,本地电脑Mobaxterm访问
  • 一个电脑装了多个python哪个生效?在 Windows 系统中修改环境变量 PATH 的优先级
  • Vue.js 按键修饰符详解:提升键盘事件处理效率
  • 筑牢安全防线:电子文件元数据驱动的 AI 知识库可控管理方案
  • TradingAgents:基于多智能体的大型语言模型(LLM)金融交易框架
  • 从零学起VIM