当前位置: 首页 > news >正文

Redis后台任务有哪些

Redis后台任务

为了有更好的性能表现,redis对于一些比较耗时的操作会异步执行,不阻塞线上请求。文章从源码(redis7.0)来看,aof、rdb文件的关闭,aof文件的刷盘以及部分内存释放会采用异步方式,在后台线程中执行。接下来我们看下这些具体的任务以及后台任务的具体实现。

Redis后台任务类型

在源码中的bio.h bio.c文件是redis后台线程任务的实现逻辑,包含以下三类任务:

/* Background job opcodes */
#define BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE     2 /* Deferred objects freeing. */
#define BIO_NUM_OPS       3
  1. 关闭文件,包括socket、数据文件的关闭。redis中主要是aof和rdb文件的close。
    异步close
  2. 刷盘,内容由内存到磁盘的过程,通常是异步写的,首先会写入page cahe,刷盘就是将修改的数据从page cache同步到磁盘。
    异步fsync
  3. 释放内存。
    异步释放

实现思路

异步任务实现逻辑
三类异步任务对应三个任务队列,分别不同的函数进行任务提交,任务队列是一个list,每个list对应一个消费线程。

  • void bioCreateCloseJob(int fd, int need_fsync); 提交异步关闭任务
  • void bioCreateFsyncJob(int fd); 提交异步刷盘任务
  • void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...); 提交异步释放任务

源码分析

初始化

在启动初始化完成后,main函数会调用InitServerLast做一些系统准备完毕后的其他初始化工作,其中就包括后台线程任务初始化初void bioInit(void);

  • 初始化每个线程用到的互斥锁、信号量、消费队列(bio_jobs)和队列长度(bio_pending)
  • 计算线程栈大小
  • 初始化每个线程,线程句柄函数bioProcessBackgroundJobs,每个线程会先进行设置名称(redis_set_thread_title)、绑核(redisSetCpuAffinity)、设置信号(pthread_sigmask),然后进入while(1)循环执行。
/* Initialize the background system, spawning the thread. */
void bioInit(void) {pthread_attr_t attr;pthread_t thread;size_t stacksize;int j;/* Initialization of state vars and objects */for (j = 0; j < BIO_NUM_OPS; j++) {pthread_mutex_init(&bio_mutex[j],NULL);pthread_cond_init(&bio_newjob_cond[j],NULL);pthread_cond_init(&bio_step_cond[j],NULL);bio_jobs[j] = listCreate();bio_pending[j] = 0;}/* Set the stack size as by default it may be small in some system */pthread_attr_init(&attr);pthread_attr_getstacksize(&attr,&stacksize);if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;pthread_attr_setstacksize(&attr, stacksize);/* Ready to spawn our threads. We use the single argument the thread* function accepts in order to pass the job ID the thread is* responsible of. */for (j = 0; j < BIO_NUM_OPS; j++) {void *arg = (void*)(unsigned long) j;if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");exit(1);}bio_threads[j] = thread;}
}

线程任务

每个线程的执行的函数都是同一个,在这个函数中根据不同的函数入参进行区别。

  • 首先是等待该线程对应的队列非空
  • 非空时从任务队列中取出头节点任务 ln->value
  • 根据不同的type类型分别执行刷盘、关闭文件、异步释放等操作
  • 执行完毕后删除节点。这里有个细节是广播bio_step_cond,是为了释放查询待执行任务数时加的锁(查询进来时,刚好有任务在执行,此时等待这个任务执行完毕)。
    while(1) {listNode *ln;/* The loop always starts with the lock hold. */if (listLength(bio_jobs[type]) == 0) {pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);continue;}/* Pop the job from the queue. */ln = listFirst(bio_jobs[type]);job = ln->value;/* It is now possible to unlock the background system as we know have* a stand alone job structure to process.*/pthread_mutex_unlock(&bio_mutex[type]);/* Process the job accordingly to its type. */if (type == BIO_CLOSE_FILE) {if (job->fd_args.need_fsync) {redis_fsync(job->fd_args.fd);}close(job->fd_args.fd);} else if (type == BIO_AOF_FSYNC) {/* The fd may be closed by main thread and reused for another* socket, pipe, or file. We just ignore these errno because* aof fsync did not really fail. */if (redis_fsync(job->fd_args.fd) == -1 &&errno != EBADF && errno != EINVAL){int last_status;atomicGet(server.aof_bio_fsync_status,last_status);atomicSet(server.aof_bio_fsync_status,C_ERR);atomicSet(server.aof_bio_fsync_errno,errno);if (last_status == C_OK) {serverLog(LL_WARNING,"Fail to fsync the AOF file: %s",strerror(errno));}} else {atomicSet(server.aof_bio_fsync_status,C_OK);}} else if (type == BIO_LAZY_FREE) {job->free_args.free_fn(job->free_args.free_args);} else {serverPanic("Wrong job type in bioProcessBackgroundJobs().");}zfree(job);/* Lock again before reiterating the loop, if there are no longer* jobs to process we'll block again in pthread_cond_wait(). */pthread_mutex_lock(&bio_mutex[type]);listDelNode(bio_jobs[type],ln);bio_pending[type]--;/* Unblock threads blocked on bioWaitStepOfType() if any. */pthread_cond_broadcast(&bio_step_cond[type]);}

提交任务

任务提交比较简单,通过上述三个Create接口实例化一个bio_job 然后追加到对应的list尾部。
bio_job使用了union,对于close和刷盘来说使用fd_args, 对于异步释放来说使用free_args。

typedef union bio_job {/* Job specific arguments.*/struct {int fd; /* Fd for file based background jobs */unsigned need_fsync:1; /* A flag to indicate that a fsync is required before* the file is closed. */} fd_args;struct {lazy_free_fn *free_fn; /* Function that will free the provided arguments */void *free_args[]; /* List of arguments to be passed to the free function */} free_args;
} bio_job;

总结

redis 针对大文件关闭、大内存释放、刷盘这些操作,分别使用对应的后台线程防止其阻塞线上请求,保证线上请求的高性能。其实现方式比较清晰,每种后台任务对应一个链表实现的消费队列和一个后台线程作为消费者,前台请求只需要通过提交函数向队列中追加待执行任务即可。实现清晰、简洁,也有一些细节,例如后台线程要屏蔽 watchdog, 这里不再深究。

http://www.lryc.cn/news/471402.html

相关文章:

  • TPair<TKey, TValue> 键值对
  • 【杂谈】城市规划教育的危与机
  • 金融工程--pine-script 入门
  • Vue3 跨标签页或跨窗口通信
  • Ollama: 使用Langchain的OllamaFunctions
  • java质数的判断 C语言指针变量的使用
  • TensorFlow面试整理-TensorFlow 数据处理
  • vue路由的基本使用
  • 数据结构分类
  • 【STM32】 TCP/IP通信协议--LwIP介绍
  • 一些面试题整理
  • 端口号和ip地址一样吗?区别是什么
  • 深入探讨全流量回溯分析与网络性能监控系统
  • python机器人编程——一种3D骨架动画逆解算法的启示(上)
  • Flutter开发者必备面试问题与答案02
  • 拥抱真实:深度思考之路,行动力的源泉
  • 【Python爬虫实战】深入理解Python异步编程:从协程基础到高效爬虫实现
  • OpenCV图像处理方法:腐蚀操作
  • PG数据库之流复制详解
  • Python酷库之旅-第三方库Pandas(174)
  • 【Linux网络】基于TCP的全连接队列与文件、套接字、内核之间的关系
  • IDE(集成开发环境)
  • 一键导入Excel到阿里云PolarDB-MySQL版
  • Oracle有哪些版本
  • 先来先服务(FCFS,First-Come, First-Served)调度算法
  • Windows操作系统忘记密码怎么办 这个方法屡试不爽 还不来试一下
  • 基于java的山区环境监督管理系统(源码+定制+开发)环境数据可视化、环境数据监测、 环境保护管理 、污染防治监测系统 大数据分析
  • jQuery Mobile 表单输入
  • IoC详解
  • 基于 ThinkPHP+Mysql 灵活用工_灵活用工系统_灵活用工平台