当前位置: 首页 > news >正文

【最后203篇系列】034 使用SQLite构建简单的任务管理

表数据同步的断点续传

有时候需要将一个表的数据复制到另一个表,循环是常用的方式。当表比较大,执行的时间很长,会有很多因素引起失败。我希望可以比较简单的跑数,所以做一个简单的任务系统。

SQLitre是嵌入式数据库,这样脚本可以不必考虑太多依赖,又可以用到数据库的持久化功能。

1 数据库初始化

给到一个文件路径参数,确定持久化的文件

import sqlite3
import time
import randomdef init_db(DB_FILE = "tasks.db"):"""初始化数据库和表"""conn = sqlite3.connect(DB_FILE)conn.row_factory = sqlite3.Row cur = conn.cursor()cur.execute("""CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT NOT NULL,content TEXT, status INT DEFAULT 0  -- 0=未开始, 1=进行中, 2=完成, 3=失败)""")# 可选:给 status 建索引cur.execute("CREATE INDEX IF NOT EXISTS idx_status ON tasks(status)")cur.execute("CREATE INDEX IF NOT EXISTS idx_name ON tasks(name)")conn.commit()return conn, cur

实际中可以以脚本的file path作为参数(结尾改为.db)

# 建立数据库
conn, cur = init_db(DB_FILE = './the_script.db')

2 任务初始化

同步任务,可以按照id分为若干区间,每个区间任务额可以称为lot。

tuple_list = slice_list_by_batch1(min_id,max_id, 10000)
task_df = pd.DataFrame()
task_df['name'] = ['lot_%s' % i for i in range(len(tuple_list)) ]  
task_df['content'] = tuple_list
task_df['content'] = task_df['content'].apply(lambda x: json.dumps(x))
task_lod = df2lod(task_df)
# 写入任务
cur.executemany("INSERT INTO tasks (name, content) VALUES (?,?)", [(t['name'],t['content']) for t in task_lod])
conn.commit()res = cur.execute('select count(*) from tasks').fetchone()
dict(res)

sqlite也可以提供类似 Row Dict的格式,需要

    conn.row_factory = sqlite3.Row cur = conn.cursor()row = cur.execute(new_task_sql).fetchone()

3 处理一次

每次启动时,读出未处理的任务。任务里有tuple参数,根据tuple的起止区间获取数据后进行处理,如果成功就根据任务名将状态更新为2,异常则更新为3。这样每次重复执行这个就可以了,因为数据持久化在文件中,所以即使和服务器断开连接也没关系。

def process_one():new_task_sql  = 'select name, content, status from tasks where status = 0 limit 1'# new_task = cur.execute(new_task_sql).fetchall()conn.row_factory = sqlite3.Row cur = conn.cursor()row = cur.execute(new_task_sql).fetchone()process_code = 1if row:row_dict = dict(row)tem_tuple = json.loads(row_dict['content'])print('>>>> ',row_dict['name'])try:some_tuple = tem_tuple---- DO LOGICcur.execute("UPDATE tasks SET status = 2 WHERE name = ?", (row_dict['name'],))conn.commit()except:cur.execute("UPDATE tasks SET status = 3 WHERE name = ?", (row_dict['name'],))conn.commit()else:process_code = 0return process_code

4 处理直到结束

由于 process_one在没有获取到待处理任务行时会返回0,这个作为结束信号。所以可以给到一个略大的循环次数,当收到结束信号时停止。

for i in range(30000):process_code = process_one()if not process_code:print('无待处理任务')break 
http://www.lryc.cn/news/626914.html

相关文章:

  • 解决Docker 无法连接到官方镜像仓库
  • LINUX 820 shell:shift,expect
  • 49 C++ STL模板库18-类模板-pair
  • 双模式 RTMP H.265 播放器解析:从国内扩展到 Enhanced RTMP 标准的演进
  • 深入理解JVM内存结构:从字节码执行到垃圾回收的全景解析
  • 基于单片机智能加湿器/空气加湿器
  • ubuntu系统上的conda虚拟环境导出方便下次安装
  • 计算机毕设Spark项目实战:基于大数据技术的就业数据分析系统Django+Vue开发指南
  • Typescript入门-数组元组讲解
  • CSS3DRenderer+ CSS3DObject实现在 Three.js 中添加文本内容
  • 监听视频是否加载完毕
  • 次短路P2865 [USACO06NOV] Roadblocks G题解
  • KubeBlocks for ClickHouse 容器化之路
  • 【机器学习深度学习】AI大模型高并发挑战:用户负载部署策略
  • OceanBase DBA实战营2期--SQL 关键字限流学习笔记
  • Angular由一个bug说起之十八:伴随框架升级而升级ESLint遇到的问题与思考
  • 文本智能抽取:如何用NLP从海量文本中“炼“出真金?-告别无效阅读,让AI成为你的“信息炼金师
  • springboot--用户访问系统的增删改查记录
  • 静/动态库 IIC(arm) day58
  • Docker在Linux中安装与使用教程
  • 【Android】Serializable和Parcelable序列化对象:传递自定义类数据
  • 无人机抗噪模块技术概述!
  • AI + 金融领域 + 落地典型案例
  • AI +金融 = 七大核心维度+ 落地典型困难
  • 基于深度学习CenterPoint的3D目标检测部署实战
  • 《GPT-OSS 模型全解析:OpenAI 回归开源的 Mixture-of-Experts 之路》
  • 使用 FastAPI 的 WebSockets 和 Elasticsearch 来构建实时应用
  • shell脚本——搜索某个目录下带指定前缀的文件
  • 标准解读——71页2025《数字化转型管理 参考架构》【附全文阅读】
  • C++11中的互斥锁,条件变量,生产者-消费者示例