当前位置: 首页 > news >正文

【go】异步任务解决方案Asynq实战

文章目录

  • 一.Asynq介绍
  • 二.所需工具
  • 三.代码示例
  • 四.Reference

一.Asynq介绍

Asynq 是一个 Go 库,一个高效的分布式任务队列。

Asynq 工作原理:

  • 客户端(生产者)将任务放入队列
  • 服务器(消费者)从队列中拉出任务并为每个任务启动一个工作 goroutine
  • 多个工作人员同时处理任务

git库:https://github.com/hibiken/asynq

二.所需工具

Asynq 使用 Redis 作为消息代理。client 和 server 都需要连接到 Redis 进行写入和读取。

PS:请确保所使用redis >= 5.0

三.代码示例

以记录操作的中间件函数向数据库写数据的情景为例。

  1. 生产者(客户端)函数调用入口:

其中 map 为需向数据库写入的内容

client.Call("audit:opera", map[string]any{"uri":        uri,"method":     method,"params":     string(paramsByte),"headers":    string(headerByte),"code":       codeInt,"model":      model,"action":     action,"user_id":    userId,"company_id": companyId,"user_name":  userName,"company":    companyName,
})
  1. 生产者函数
func Call(t string, payload map[string]any) error {// redis连接client := asynq.NewClient(asynq.RedisClientOpt{Addr:     "127.0.0.1:6379",Password: "",DB:       1,})defer client.Close()switch t {case "audit:opera":// 初始化新任务task, err := server.NewOperateSendTask(payload)if err != nil {return err}// 任务入队_, err = client.Enqueue(task, asynq.Queue("audit"))if err != nil {log.Err(err).Msg(fmt.Sprintf("task: %v\n", task))return err}}return nil
}
func NewOperateSendTask(data map[string]any) (*asynq.Task, error) {payload, err := json.Marshal(data)if err != nil {return nil, err}return asynq.NewTask(consts.TypeAuditOpera, payload), nil
}
  1. 消费者函数
func HandlerAuditOperateTask(ctx context.Context, t *asynq.Task) error {var record ent.OperateRecord// 队列中取任务err := json.Unmarshal(t.Payload(), &record)if err != nil {log.Err(err).Msg("task.json.Unmarshal")return err}// 真正的数据库操作err = dao.OperateRecord.CreateOperateRecord(&record)if err != nil {log.Err(err).Msg("task.dao.OperateRecord.CreateOperateRecord")return err}return nil
}
  1. asynq初始化(消费者启动入口,项目初始化时自动启动)
func InitAsynq(ip string, port int, passwd string) {addr := fmt.Sprintf("%s:%d", ip, port)srv := asynq.NewServer(asynq.RedisClientOpt{Addr:     "127.0.0.1:6379",Password: "",DB:       1,},// 异步队列asynq.Config{Queues: map[string]int{"audit": 3,},},)mux := asynq.NewServeMux()// 启动消费者mux.HandleFunc("audit:opera", server.HandlerAuditOperateTask)go srv.Run(mux)}

四.Reference

Go异步任务解决方案之Asynq库详解:
https://www.jb51.net/article/275392.htm

http://www.lryc.cn/news/158156.html

相关文章:

  • 掌握 Android 自动化测试框架 UiAutomator UiAutomator2
  • c#抽象类(abstract)
  • 语义分割实践思考记录(个人备忘录)
  • Zebec Protocol 成非洲利比亚展会合作伙伴,并将向第三世界国家布局
  • 随机流-RandomAccessFile
  • 单例和静态类
  • PMP-项目风险管理的重要性
  • 学习的心得
  • Python网络爬虫中这七个li标签下面的属性值,不是固定的,怎样才能拿到他们的值呢?...
  • 白鲸开源 DataOps 平台加速数据分析和大模型构建
  • (其他) 剑指 Offer 65. 不用加减乘除做加法 ——【Leetcode每日一题】
  • RestTemplate 的用法
  • postgresql-使用plpgsql批量插入用户测试数据
  • 通过Siri打造智能爬虫助手:捕获与解析结构化数据
  • 【电源专题】典型设备的接地设计
  • LeetCode-216-组合总和Ⅱ
  • [技术杂谈]几款常用的安装包制作工具
  • 旋转屏幕显示方向-rk3568
  • 07 Linux补充|秋招刷题|9月6日
  • 【JavaGuide学习笔记】Day.1
  • 大数据课程K18——Spark的ALS算法与显式矩阵分解
  • Android Jetpack架构组件库:Hilt
  • 企业帮助中心如何在线搭建,还能多场景使用呢?
  • C++ primer plus第十五章编程练习答案
  • 【精品】商品规格 数据库表 设计
  • 无人机集群路径规划MATLAB:孔雀优化算法POA求解无人机集群三维路径规划
  • Dockerfile创建镜像异常问题解决
  • 使用libcurl请求https的get/post
  • AUTOSAR规范与ECU软件开发(实践篇)7.3 MCAL模块配置方法及常用接口函数介绍之GPT的配置
  • Android 性能优化--内存优化分析总结