Fast API使用
相关的代码上都有注释,其中前端代码是用来提交表单的
此代码进行了跨域处理,允许前端直接提交表单,并正常返回
完整代码:
from typing import Unionfrom fastapi import Header, Cookie
from pydantic import BaseModel, Field
from fastapi.responses import RedirectResponse
from fastapi import Depends, FastAPI, HTTPException, Form, File, UploadFile
from fastapi.responses import JSONResponse
from typing import Optional
from fastapi.middleware.cors import CORSMiddleware
import asyncioapp = FastAPI()# 配置允许的跨域来源
origins = ["http://127.0.0.1", # 本地前端"http://127.0.0.1:3000", # 如果使用 VS Code Live Server"http://127.0.0.1:8000", # 如果使用 VS Code Live Server"http://localhost", # 本地前端"http://localhost:3000" # 本地开发时的 URL"http://localhost:8000" # 本地开发时的 URL
]# 添加 CORS 中间件
app.add_middleware(CORSMiddleware,allow_origins=origins, # 允许的来源allow_credentials=True, # 允许发送 Cookieallow_methods=["*"], # 允许所有方法allow_headers=["*"], # 允许所有头部
)# 自定义中间件处理 OPTIONS 请求
@app.middleware("http")
async def options_middleware(request, call_next):if request.method == "OPTIONS":headers = {"Access-Control-Allow-Origin": "*", # 允许所有来源"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE", # 允许的 HTTP 方法"Access-Control-Allow-Headers": "Content-Type", # 允许的请求头}return JSONResponse(content=None, status_code=200, headers=headers)response = await call_next(request)print(response)# 添加 CORS 头部到响应response.headers["Access-Control-Allow-Origin"] = "*" # 或指定前端地址return response# 定义实体对象
class Item(BaseModel):name: str = Field(..., title="Item Name", max_length=100)description: str = Field(None, title="Item Description", max_length=255)price: float = Field(..., title="Item Price", gt=0)tax: float = None# 依赖项函数
def common_parameters(q: str = None, skip: int = 0, limit: int = 100):return {"q": q, "skip": skip, "limit": limit}# 主路径
@app.get("/")
def read_root():return {"Hello": "World"}# get
@app.get("/items/{item_id}")
def read_item(item_id: int, q: Union[str, None] = None):if item_id == 42:raise HTTPException(status_code=404, detail="Item not found")content = {"item_id": item_id}headers = {"X-Custom-Header": "custom-header-value"}return JSONResponse(content=content, headers=headers)# put
@app.put("/items/{item_id}")
def update_item(item_id: int, item: Item):return {"item_id": item_id}# post
@app.post("/items/")
def create_item(item: Item):print(f"{item.name}--{item.description}--{item.price}--{item.tax}")return item# 请求依赖前置
@app.get("/items/")
def read_item(commons: dict = Depends(common_parameters), user_agent: str = Header(None),session_token: str = Cookie(None)):return commons# return {"User-Agent": user_agent, "Session-Token": session_token}# 后处理函数
async def after_request():print("after_request")pass# 后处理依赖项
@app.get("/items/", response_model=dict)
async def read_items_after(request: dict = Depends(after_request)):return {"message": "Items returned successfully"}# 异步依赖项函数
async def get_token():# 模拟异步操作await asyncio.sleep(2)return "fake-token"# 异步路由操作函数
@app.get("/async/")
async def read_items(token: Optional[str] = Depends(get_token)):return {"token": token}# 直接前端传参
@app.post("/login/")
async def login(username: str = Form(), password: str = Form()):return {"username": username}# redirect
@app.get("/redirect")
def redirect():return RedirectResponse(url="/items/")# 路由操作函数
@app.post("/files/")
async def create_file(file: UploadFile = File(...)):return {"filename": file.filename}# 传参处理
@app.get("/items/")
def read_item(skip: int = 0, limit: int = 10):return {"skip": skip, "limit": limit}
前端代码:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>Create Item</title>
</head>
<body><h1>Create Item</h1><form id="itemForm"><label for="name">Item Name:</label><br><input type="text" id="name" name="name" required maxlength="100"><br><br><label for="description">Item Description:</label><br><textarea id="description" name="description" maxlength="255"></textarea><br><br><label for="price">Item Price:</label><br><input type="number" id="price" name="price" step="0.01" required min="0.01"><br><br><label for="tax">Item Tax (optional):</label><br><input type="number" id="tax" name="tax" step="0.01" min="0"><br><br><input type="submit" value="Submit"></form><script>document.getElementById('itemForm').addEventListener('submit', async function(event) {event.preventDefault();// Gather the form dataconst formData = new FormData(event.target);const data = {name: formData.get('name'),description: formData.get('description'),price: parseFloat(formData.get('price')),tax: formData.has('tax') ? parseFloat(formData.get('tax')) : null};try {const response = await fetch('http://127.0.0.1:8000/items/', {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify(data)});if (response.ok) {const jsonResponse = await response.json();alert('Item created: ' + JSON.stringify(jsonResponse));} else {alert('Error: ' + response.statusText);}} catch (error) {alert('Network error: ' + error);}});</script>
</body>
</html>