当前位置: 首页 > news >正文

Python sanic框架钉钉和第三方打卡机实现

同样还是需要开通钉钉应用这里就不错多说了

第一步:梳理逻辑流程

        前提:打卡的机器是使用postgres数据库,由于因为某些原因,钉钉userId 我已经提前获取到了存放到数据库里。

        1.用户打卡成功后,我们应该监听数据库进行查询,然后获取到打卡的时间,在通过钉钉的工作消息接口,发送消息给当前考勤打卡的用户,这样用户就可以知道我上班的打卡时间!

现在我们看看应该怎么去实现

先定义钉钉接口先Token:

appkey = ""
appsecret = ""
ding_url = "https://oapi.dingtalk.com/"async def dingTalkToken():async with httpx.AsyncClient() as client:response = await client.get(ding_url + 'gettoken', params={"appkey": appkey, "appsecret": appsecret})# 解析响应JSONresult = response.json()# 提取Access Tokenaccess_token = result.get("access_token")return access_token

钉钉工作消息接口

async def dingTalkTokenAsyncsend_v2(access_token, kqTime, userid):params = {"agent_id": ,"msg": {"msgtype": "text","text": {"content": "打卡成功:" + kqTime}},"userid_list": userid}async with httpx.AsyncClient() as client:response = await client.post(ding_url + 'topapi/message/corpconversation/asyncsend_v2?access_token=' + access_token, params=params)# 解析响应JSONresult = response.json()# 提取Access Tokenerrcode = result.get("errcode")return errcode

以上向钉钉工作发送消息的接口已经完成了

接下来就是核心代码:

async def check_notifications():print('执行')try:# 连接到数据库with psycopg2.connect(dbname=posql.dbname, user=posql.user, password=posql.password, host=posql.host,port=posql.port) as connection:# 创建一个游标对象,用于执行 SQL 语句with connection.cursor() as cursor:# 执行查询cursor.execute("LISTEN punch_event_channel")while True:connection.commit()  # 提交事务await asyncio.sleep(1)# 检查是否有通知connection.poll()  # 从服务器获取通知if connection.notifies:notify = connection.notifies[0]# 执行查询cursor.execute("这填写打卡系统的数据库使用ID去查询最新
SELECT * FROM table WHERE id = %s" % int(notify.payload))# 获取查询结果result = cursor.fetchone()columns = [desc[0] for desc in cursor.description]result_dict = dict(zip(columns, result))# 处理 datetime 对象的序列化result_dict['timestamp'] = result_dict.get('timestamp', None)if result_dict['timestamp']:result_dict['timestamp'] = datetime.fromisoformat(result_dict['timestamp'])kqTime = result_dict['att_date'] + ' ' + result_dict['att_time']# 获取用户IDuser_name = result_dict['person_name']sql_str = """EXEC GetUserDingByName @UserName = N'%s';""" % user_nameuser_id = query_user_info(sql_str)if user_id != 'null':# 发送HTTP POST请求获取Access Tokenaccess_token = await dingTalkToken()codes = await dingTalkTokenAsyncsend_v2(access_token, kqTime, user_id)print(f"考勤时间: " + result_dict['att_date'] + ' ' + result_dict['att_time'])breakelse:break# 去执行 钉钉推送模块# messages = '发送成功'else:print({"notify_payload": '没有消息'})await asyncio.sleep(1)except Exception as e:print({"error": str(e)})

解释一下代码:

 # 执行查询
cursor.execute("LISTEN punch_event_channel")

这里我是在考勤机器的数据库里做了一个punch_event_channel 的频道,而这个频道是我创建了一个触发函数用来触发最新数据库里的数据

接下来是创建触发函数

-- 创建触发器函数
CREATE OR REPLACE FUNCTION notify_punch_event()
RETURNS TRIGGER AS
$$
BEGINPERFORM pg_notify('punch_event_channel', 'new_punch_event');RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- 创建触发器,关联到表的 AFTER INSERT 事件上CREATE TRIGGER 触发器名称XXXXX
AFTER INSERT ON 表名
FOR EACH ROW 
EXECUTE PROCEDURE notify_punch_event();

在sql 工具执行这两句就可以了,替换成你自己的数据库

另外这里是我内部拿取钉钉Userid的数据库,我就不放代码了:

# 获取用户ID
user_name = result_dict['person_name']
sql_str = """EXEC GetUserDingByName @UserName = N'%s';""" % user_name
user_id = query_user_info(sql_str)

你们可以根据自己方式,来获取

最后,就是使用定时任务来

async def periodic_task():while True:await check_notifications()await asyncio.sleep(1)  # 1秒钟检查一次,可以根据需要调整间隔if __name__ == '__main__':# 启动定时任务app.add_task(periodic_task())# 启动 Sanic 应用app.run(host='0.0.0.0', port=8089, workers=8)

最后记得导入包

import psycopg2
import httpx
import pymssql # 这是sql server 数据库连接

如果写的好动动你们发财的小手点赞,对你有帮助也可以打赏请我喝杯咖啡,提提神,感谢各位兄弟了

http://www.lryc.cn/news/268493.html

相关文章:

  • 微信小程序性能优化
  • java并发编程六 ReentrantLock,锁的活跃性
  • 深度学习 | DRNN、BRNN、LSTM、GRU
  • 代理模式:中间者的故事
  • 中间件系列 - Redis入门到实战(高级篇-多级缓存)
  • 是德科技E9304A功率传感器
  • 视频格式网络地址转换视频到本地,获取封面、时长,其他格式转换成mp4
  • 企业私有云容器化架构运维实战
  • Baumer工业相机堡盟工业相机如何通过NEOAPI SDK使用UserSet功能保存和载入相机的各类参数(C++)
  • STM32的以太网外设+PHY(LAN8720)使用详解(3):PHY寄存器详解
  • 缓存和缓冲的区别
  • C++高级-STL库概述
  • uniapp 统一获取授权提示和48小时间隔授权
  • Halcon点云重建
  • docker学习(二十一、network使用示例container、自定义)
  • 【Python机器学习系列】一文带你了解机器学习中的Pipeline管道机制(理论+源码)
  • 算法基础之整数划分
  • 关于“Python”的核心知识点整理大全47
  • Android 8.1 设置USB传输文件模式(MTP)
  • 模型量化 | Pytorch的模型量化基础
  • adb和logcat常用命令
  • 千巡翼X4轻型无人机 赋能智慧矿山
  • 【Android 13】使用Android Studio调试系统应用之Settings移植(一):编译服务器的配置、AOSP源码的下载、编译、运行
  • 【1】Docker详解与部署微服务实战
  • C# JsonString转Object以及Object转JsonString
  • 华为OD机试真题-中文分词模拟器-2023年OD统一考试(C卷)
  • 【并发设计模式】聊聊 基于Copy-on-Write模式下的CopyOnWriteArrayList
  • OpenCV中使用Mask R-CNN实现图像分割的原理与技术实现方案
  • 论文阅读《Rethinking Efficient Lane Detection via Curve Modeling》
  • Leetcode—2660.保龄球游戏的获胜者【简单】