当前位置: 首页 > news >正文

golang 函数式编程库samber/mo使用: Future

golang 函数式编程库samber/mo使用: Future

如果您对samber/mo库不了解, 请先阅读第一篇 Option

本节讲述Future的使用,它可以帮助我们处理异步编程问题。

示例

我们先来看看下面代码的示例, 注释解释了每一步的操作。

package mainimport ("fmt""github.com/samber/mo"
)func main() {// resolve 在这里只是一个定义, NewFuture会以一个 goroutine 的方式执行 cb, 并且传递Future 的 resolve 和 rejectvalue, err := mo.NewFuture(func(resolve func(string), reject func(error)) {// do something hereif true { // 这里假定 do something 成功// 如果 do something 成功, 执行 resolve, 并传递一个值, 然后会执行 Thenresolve("foobar")} else {// 告诉 do something 失败, 执行 reject, 并传递一个错误, 然后会执行 Catchreject(fmt.Errorf("failure"))}}).Then(func(s string) (string, error) {// 这里 s 就是 resolve 传递的值return s, nil}).Catch(func(err error) (string, error) {// 这里 err 就是 reject 传递的错误return "foobar", nil}).Finally(func(value string, err error) (string, error) {// 不管发生什么都会执行, value 是 resolve 传递的值, err 是 reject 传递的错误return value, nil}).Collect() // 等待 future 执行完毕, 并返回最终的值fmt.Println(value)fmt.Println(err)// Output:// foobar// <nil>
}

源码解析

根据mo.NewFuture的实现, 可以看出该函数做的事情就是构造一个Future, 然后执行activate函数, activate实际就是用 goroutine 执行cb函数, 并且将 Future 的 resolve 和 reject函数作为参数传递给cb

func NewFuture[T any](cb func(resolve func(T), reject func(error))) *Future[T] {future := Future[T]{cb:       cb,cancelCb: func() {},done:     make(chan struct{}),}future.active()return &future
}func (f *Future[T]) active() {go f.cb(f.resolve, f.reject)
}

resolve的实现如下, 可以看到resolve做的事情就是用mo.OK包装value, 记录到result中,并且关闭f.done, 表明future已经完成。

resolve加锁的目的是为了确保后续Then或Finally不会同时进行。

func (f *Future[T]) resolve(value T) {f.mu.Lock()defer f.mu.Unlock()f.result = Ok(value)if f.next != nil { // 这里如果不为空,表明next先于something注册,需要执行f.next.activeSync()}close(f.done)
}

我们来看看Then的实现,这个函数先对f执行加锁, 然后构造一个新的Future,这个新的Future的cb函数就是为了判断f的执行结果, 如果f的result不是error, 就执行Then注册的回调 cb。 所以如果f.cb函数执行resolve后返回, f.result.IsError()为false, 会执行Then中的回调。

最后的select表示如果f已完成,用goroutine 执行Then中的回调。如果f还没有完成,则留待f.cb的resolve或reject执行Then的回调。两种情况都会直接返回f.next,不会阻塞。这样就实现了Future的串联。

func (f *Future[T]) Then(cb func(T) (T, error)) *Future[T] {f.mu.Lock()defer f.mu.Unlock()f.next = &Future[T]{cb: func(resolve func(T), reject func(error)) {if f.result.IsError() {reject(f.result.Error())return}newValue, err := cb(f.result.MustGet())if err != nil {reject(err)return}resolve(newValue)},cancelCb: func() {f.Cancel()},done: make(chan struct{}),}select {case <-f.done:f.next.active()default:}return f.next
}

CatchThen的区别是如果f.result是error, 就执行Catch中的回调。Finally是不管f.result是什么, 都会执行Finally中的回调。

最后的Collect是用于等待Future执行完毕, 并返回最终的值。

func (f *Future[T]) Collect() (T, error) {<-f.donereturn f.result.Get()
}

还有 ResultEither方法, 用于获取Future的执行结果, 会阻塞直到Future执行完毕(也就是先执行Collect)

http://www.lryc.cn/news/307687.html

相关文章:

  • 【Spring连载】使用Spring Data访问 MongoDB(十四)----Mongodb特有的查询方法
  • 消息中间件篇之RabbitMQ-消息重复消费
  • 常见设计模式之单例模式
  • VL817-Q7 USB3.0 HUB芯片 适用于扩展坞 工控机 显示器
  • 【Android安全】Windows 环境下载 AOSP 源码
  • Vue.js+SpringBoot开发快递管理系统
  • Linux/Spectra
  • C 嵌入式系统设计模式 08:硬件代理模式
  • 【k8s配置与存储--持久化存储(PV、PVC、存储类)】
  • 【Vite】解决Vite http proxy error: Error: connect ECONNREFUSED
  • FPGA领域顶级学术会议
  • 罗技鼠标滚轮模式介绍 | 鼠标滚轮异响 - 解决方案
  • Scrapy与分布式开发(2.2):正则表达式
  • 今年“全国爱耳日”主题确定!立聪堂助听器组织社区义诊
  • 区块链智能合约开发
  • Android 启动流程及 init 进程解析
  • Java设计模式:核心概述(一)
  • 计算机网络:IP
  • CSS中使用变量的两个函数var和calc
  • 了解docker与k8s
  • 服务器防火墙的应用技术有哪些
  • 打开 Camera app 出图,前几帧图像偏暗、偏色该怎样去避免?
  • SD-WAN技术:优化国内外服务器访问的关键
  • 【MySQL】学习和总结标量子查询
  • vue3第三节(v-model 执行原理)
  • RunnerGo UI自动化测试脚本如何配置
  • Android 指南针校准进度计算实现
  • c++学习:Lambda练习和数组练习
  • 数据仓库和数据湖的区别
  • tkinterFrame框架+标签框架LabelFrame+Toplevel窗口的使用