1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
| import sys import jwt from pydantic import BaseModel import uvicorn,asyncio,signal,os from fastapi import FastAPI, HTTPException, Depends from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from tortoise import fields from tortoise.models import Model from tortoise.contrib.pydantic import pydantic_model_creator from tortoise.contrib.fastapi import register_tortoise from fastapi import FastAPI, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.middleware.cors import CORSMiddleware
app = FastAPI() async def startup_event(): print("Application has started.")
async def shutdown_event(): print("Application is shutting down.")
app.add_event_handler("startup", startup_event) app.add_event_handler("shutdown", shutdown_event)
db_config = { 'connections': { 'default': { 'engine': 'tortoise.backends.mysql', 'credentials': { 'host': 'localhost', 'port': '3306', 'user': 'root', 'password': 'root', 'database': 'fiber' } }, }, 'apps': { 'models': { 'models': ['main'], # model所在得包位置, 'default_connection': 'default', # 更新为数据库连接名称 } } }
register_tortoise( app, config=db_config, generate_schemas=False )
# JWT验证中间件 async def jwt_middleware(request: Request, call_next): # 检查是否是登录接口 if request.url.path == "/token": response = await call_next(request) return response http_bearer = HTTPBearer() credentials: HTTPAuthorizationCredentials = await http_bearer(request) token = credentials.credentials
# 检查令牌是否存在 if token is None: raise HTTPException(status_code=401, detail="Token missing") try: payload = jwt.decode(token, JWT_SECRET, algorithms=['HS256']) user = await User.get(id=payload.get('id')) except Exception: raise HTTPException(status_code=401, detail="Token invalid") # 将解析后的payload存储在请求的状态中 request.state.payload = payload response = await call_next(request) return response
# 添加JWT验证中间件到应用程序 app.middleware("http")(jwt_middleware) # 全局中间件,用于处理程序中的异常 @app.exception_handler(Exception) async def handle_exceptions(request: Request, exc: Exception): # 在这里可以根据需要进行相应的异常处理逻辑 # 例如记录日志、返回自定义的错误响应等 return { "code": 500, "message": "Internal Server Error", }
# 添加CORS中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], )
class User(Model): id = fields.IntField(pk=True,generated=True) username = fields.CharField(max_length=50, unique=True) password = fields.CharField(max_length=128) def __str__(self): return self.username class Meta: table = 'users' @classmethod def get_user(cls, username): return cls.get(username=username)
# 这里没有真正校验pass def verify_password(self, password): return password == self.password
UserPydantic = pydantic_model_creator(User, name='User') UserInPydantic = pydantic_model_creator(User, name='UserIn',exclude_readonly=True) # 其实下面的jsonbody体更常用 #定义读取类 class UserInPydantic2(BaseModel): username: str password: str
oauth2_sceheme = OAuth2PasswordBearer(tokenUrl="token") JWT_SECRET = 'myeasypeasyjwtsecret' # async def get_current_user(token: str = Depends(oauth2_sceheme)): # try: # payload = jwt.decode(token, JWT_SECRET, algorithms=['HS256']) # user = await User.get(id=payload.get('id')) # except: # raise HTTPException( # status_code=401, # detail='Invalid username or password' # ) # return await UserPydantic.from_tortoise_orm(user) async def authenticate_user(username: str, password: str): user = await User.get(username=username) if not user: return False if not user.verify_password(password): return False return user
# @app.post('/token') # async def generate_token(form_data: OAuth2PasswordRequestForm = Depends()): # user = await authenticate_user(form_data.username, form_data.password) # if not user: # return {"Error": "Invalid username or password"} # user_obj = await UserPydantic.from_tortoise_orm(user) # token = jwt.encode(user_obj.dict(), JWT_SECRET) # return { # "access_token": token, # "token_type": "Bearer", # }
@app.post('/token') async def generate_token(form_data: UserInPydantic): user = await authenticate_user(form_data.username, form_data.password) if not user: return {"Error": "Invalid username or password"} user_obj = await UserPydantic.from_tortoise_orm(user) token = jwt.encode(user_obj.dict(), JWT_SECRET) return { "access_token": token, "token_type": "Bearer", }
@app.post('/users') async def create_user(user: UserInPydantic): try: user_obj = User(username=user.username, password=user.password) # 使用哈希密码 await user_obj.save() return await UserPydantic.from_tortoise_orm(user_obj) except Exception as e: print(f"An error occurred while saving the user: {e}") raise HTTPException(status_code=500, detail="Internal server error")
# @app.get('/users/me') # async def get_user(current_user: UserPydantic = Depends(get_current_user)): # return current_user @app.get('/users/me') async def get_user(request: Request): return request.state.payload
if __name__ == "__main__": try: # 程序主逻辑 uvicorn.run('main:app', host="127.0.0.1", port=8000, reload=True) except KeyboardInterrupt: sys.exit(0) # 执行必要的清理工作 else: # 程序正常退出 print("Program exited normally")
|