当前位置: 首页 > news >正文

Flask框架-流量控制:flask-limiter的使用

一、flask使用flask-limiter存在版本问题

Flask==1.1.4
Flask-Bootstrap==3.3.7.1
Flask-Caching==1.9.0
Flask-Cors==3.0.10
Flask-Limiter==1.4
Flask-Migrate==2.5.3
Flask-RESTful==0.3.8
Flask-Script==2.0.6
Flask-SocketIO==5.0.1
Flask-Sockets==0.2.1
Flask-SQLAlchemy==2.4.4
Jinjia2==2.11.3
Werkzeug==1.0.1

flask1.x.x版本,使用flask-limiter=1.4.0版本的。

二、配置

ext/__init__.py

from flask_sqlalchemy import SQLAlchemy
from flask_restful import Api
from flask_cors import CORS
from flask_socketio import SocketIO
from threading import Lock
from flask_caching import Cache
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address #以ip地址作为流量控制条件db = SQLAlchemy() #数据库拓展
api = Api() #api拓展
# cors = CORS(cors_allowed_origins='*')
cors = CORS()
socketio = SocketIO(cors_allowed_origins='*') #websocket拓展
all_lock = Lock() 
cache = Cache() #缓存limiter = Limiter(key_func=get_remote_addr,default_limits=['120/minute']) 
#流量控制拓展,设置了一个全局的流量控制,所有的接口都生效

apps/__init__.py

from flask import Flask
from ext import limiterapp = Flask(__file__)
limiter.init_app(app) #拓展流量控制

三、简单使用

频率限制语法:

1、'100 per day'、'20 per hour'、'5 per minute'、'1 per second'

2、'100/day'、'20/hour'、'5/minute'、'1/second'    #推荐这个,看起来简单

FBV使用

from ext import limiter
from flask import jsonify,request
from apps import app@limiter.limit('5/minute')  #一分钟5次
@app.route('/test',method=['POST','GET'])
def test():return jsonify({'code':200,'msg':'测试'})

CBV使用

from ext import limiter
from flask import jsonify,request
class TestRateView(views.MethodView):decorators = [limiter.limit("5/minute")]def get(self):return view_response(code=200,msg='测试西安舒')

四、高级使用

在很多时候,我们不仅仅需要对IP进行流量控制,可能需要对用户名,对请求携带的数据进行流量控制,这时候就需要自定义流量控制的条件了

from base.authen import decode_token_for_limiter
from flask import request
#根据用户的id作为限制
def get_user_id():token = request.headers.get('token')if not token:token = request.cookies.get('token')if token:#解析token,拿到用户的iduser = decode_token_for_limiter(token)else:user = 'anonymity'#返回什么,就对什么进行流量控制return user

使用:

from ext import limiter
from flask import jsonify,request
from util.limiter_ import get_user_idclass TestRateView(views.MethodView):decorators = [limiter.limit("5/minute",key_func=get_user_id)]def get(self):return view_response(code=200,msg='测试西安舒') 

五、处理流量控制的状态码

在开发过程中,一般需要统一状态码,或者使用自己的一套规则。一般需要重新修改状态码的。

可以使用flask自带的异常处理装饰器来实现:

在启动文件中:超过频率限制时,状态码 是429,可以捕获这个状态码

from flask import jsonify
from apps import app@app.errorhandler(429)
def handle_rate_limit_exceeded(e):return jsonify({"error": "请求频率超过限制",'code':400}), 200

http://www.lryc.cn/news/107584.html

相关文章:

  • 【机器学习】西瓜书习题3.5Python编程实现线性判别分析,并给出西瓜数据集 3.0α上的结果
  • Elasticsearch:通过动态修剪实现更快的基数聚合
  • Webpack5 生产模式压缩图片ImageMinimizerPlugin
  • 时序预测 | Matlab实现基于BP神经网络的电力负荷预测模型
  • 基于回溯算法实现八皇后问题
  • Linux【网络编程】之深入理解TCP协议
  • 如何克服看到别人优于自己而感到的焦虑和迷茫?
  • 浅谈React中的ref和useRef
  • Linux C 获取主机网卡名及 IP 的几种方法
  • 解密外接显卡:笔记本能否接外置显卡?如何连接外接显卡?
  • list与erase()
  • Arcgis 分区统计majority参数统计问题
  • vue2+wangEditor5富文本编辑器(图片视频自定义上传七牛云/服务器)
  • shell脚本练习--安全封堵脚本,使用firewalld实现
  • 双端冒泡排序
  • 如何在Visual Studio Code中用Mocha对TypeScript进行测试
  • GO中Json的解析
  • chatgpt 提示词-关于数据科学的 75个词语
  • (自控原理)控制系统的数学模型
  • Webpack5 cacheGroups
  • 前端面试的游览器部分(5)每篇10题
  • 数据挖掘七种常用的方法汇总
  • 自然语言处理学习笔记(二)————语料库与开源工具
  • Rust dyn - 动态分发 trait 对象
  • uniapp 中过滤获得数组中某个对象里id:1的数据
  • Django系列之Channels
  • HTTP——五、与HTTP协作的Web服务器
  • pyspark笔记 Timestamp 类型的比较
  • SpringBoot 集成 Redis
  • 黑客学习笔记(网络安全)