当前位置: 首页 > article >正文

FastAPI集成APsecheduler的BackgroundScheduler+mongodb(精简)

项目架构:

    FastAPI(folder)

                 >app(folder)

                 >core(folder)

                 >models(folder)

                 >routers(folder)

                 >utils(folder)

                   main.py(file)

1 utils文件夹下新建schedulers.py   

from apscheduler.schedulers.background import BackgroundScheduler

from apscheduler.jobstores.mongodb import MongoDBJobStore

#特殊说明:此处显示指定DB,会在DB里创建数据库apscheduler_db

jobstores={

        'default':MongoDBJobStore(

              database='apscheduler_db',

              collection='custom_jobs',  

              host='localhost',

              port=27017

        )

}

#特殊说明:replace_existing=True会覆盖同名的JOB,但不影响数据库中的,仅处理job_id相同的冲突

scheduler=BackgroundScheduler(jobstores=jobstores,replace_existing=True)

2  main.py中在lifespan上下文初始化和关闭scheduler

import uvicorn

from contextlib import asynccontextmanager

from app.utils.schedulers import jobstores

scheduler=None

#特殊说明:yield中可以监控到正常结束比如ctrl+c,异常结束不能执行yield后代码

@asynccontextmanager

async def lifespan(app:FastAPI):

    jobstores['default'].remove_all_jobs()

    from app.utils.schedulers import scheduler

    yield

    scheduler.remove_all_jobs()

    scheduler.shutdown(wait=False)

app=FastAPI(lifespan=lifespan)

 3 models文件夹新建scheduler.py文件配置基础参数类和默认值

from pydantic import BaseModel

class job_config(BaseModel):

        job_id:str="default"

        job_name:str="default"

        trigger_type:str="interval"

        trigger_kwargs:dict={}

        seconds:int=30

        pass

4 api文件夹下添加sechedulers.py配置添加创建job方法

from app.utils.schedulers import scheduler

from app.models.schedulers import job_config

from fastapi import APIRouter

from typing import Coroutine,Callable

from datetime import datetime

@router.get("create_job")

def create_job(jobconfig,func):

    try:

        if jobconfig is None:

           jobconfig=job_config()

        scheduler.add_job(

            func,

            trigger=jobconfig.trigger_type,

            kwargs=jobconfig.trigger_kwargs,#特殊说明:这里可以添加自定义参数

            id=jobconfig.job_id,

            name=jobconfig.job_name,

            seconds=jobconfig.seconds

        )

       

        scheduler.start()

    except Exception as e:

        raise e

    except (KeyboardInterrupt, SystemExit):

        scheduler.shutdown()

 5 测试,调用test方法

@router.get("/function")

def function1():

    try:

        with open("D:\\demo.txt", "a") as file:

             print("写入文件"+ str(datetime.now()), file=file)

    except:

        pass

@router.get("/test")

def test():

    try:

        create_job(None,function1)

    except:

        pass

http://www.lryc.cn/news/2394040.html

相关文章:

  • 本地部署FreeGPT+内网穿透公网远程访问,搞定ChatGPT外网访问难题
  • linux 1.0.3
  • 基于RK3588的智慧农场系统开发|RS485总线|华为云IOT|node-red|MQTT
  • 解锁程序人生学习成长密码,从目标设定开始
  • 简单cnn
  • C#集合循环删除某些行
  • 相机定屏问题分析四:【cameraserver 最大request buffer超标】后置视频模式预览定屏闪退至桌面
  • 【Linux 学习计划】-- 进程地址空间
  • 告别重复 - Ansible 配置管理入门与核心价值
  • 3D Gaussian splatting 04: 代码阅读-提取相机位姿和稀疏点云
  • CTFHub-RCE 命令注入-过滤空格
  • 卫生间改造翻新怎么选产品?我在瑞尔特找到了解决方案
  • C++ list数据删除、list数据访问、list反转链表、list数据排序
  • Express教程【002】:Express监听GET和POST请求
  • mysql安装教程--笔记
  • C++ 观察者模式:设计与实现详解
  • 【PostgreSQL 03】PostGIS空间数据深度实战:从地图服务到智慧城市
  • HIT-csapp大作业:程序人生-HELLO‘s P2P
  • 深入探讨redis:主从复制
  • 帕金森常见情况解读
  • 清华大学发Nature!光学工程+神经网络创新结合
  • 【android bluetooth 案例分析 04】【Carplay 详解 3】【Carplay 连接之车机主动连手机】
  • C++学习-入门到精通【11】输入/输出流的深入剖析
  • NW969NW978美光闪存颗粒NW980NW984
  • 使用 ssld 提取CMS 签名并重签名
  • 前端基础之《Vue(17)—路由集成》
  • 大厂前端研发岗位PWA面试题及解析
  • 第十四章 MQTT订阅
  • element ui 表格 勾选复选框后点击分页不保存之前的数据问题
  • DataAgent产品经理(数据智能方向)