什么是 Json Web Tokens(JWT)
请求流程
首先 浏览器
向服务器
发送请求
得到 token
然后 浏览器每次请求
都自动在头部
带上token
token 的组成
token 是一些信息
(用户名,过期时间 … )的加密结果
,加密的秘钥
保存在 服务器
,因此只要 秘钥 不被泄漏
,就认为是安全
的。
FastAPI 的 JWT
哈希密码
把用户的
明文密码
,加密
保存在数据库
,即使密码泄漏
,也不知道
用户真正的密码
。
推荐的算法是 「Bcrypt」 :pip3 install passlib[bcrypt]
from passlib.context import CryptContext # 创建 哈希 上下文 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # 校验 密文 明文 def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # 加密明文 def get_password_hash(password): return pwd_context.hash(password) if __name__ == '__main__': password_hash = get_password_hash('123456') # 加密结果是动态的 每次不一样,但是验证是一样的 print(password_hash) print(verify_password('123456', password_hash))
token 生成 & 校验 逻辑说明
需要 加密使用的
秘钥
加密算法
token过期时间
# openssl rand -hex 32 SECRET_KEY = "12b2e365a1b19051f115e46e8dfd7200e63510319a791bcb2dcf605626e1aa0c" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30
token 生成 与 校验
安装模块:pip install python-jose[cryptography]
from typing import Optional from jose import JWTError, jwt from datetime import datetime, timedelta # openssl rand -hex 32 SECRET_KEY = "12b2e365a1b19051f115e46e8dfd7200e63510319a791bcb2dcf605626e1aa0c" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 def create_access_token(user: dict, expires_delta: Optional[timedelta] = None): to_encode = user.copy() # 浅拷贝:深拷贝父对象(一级目录),子对象(二级目录)不拷贝,还是引用 if expires_delta: # 能通过参数指定 过期时间 expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) # 加入过期时间 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # 加密信息 得到 token return encoded_jwt if __name__ == '__main__': access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # 生成时间格式 0:30:00 token = create_access_token( # 生成 Token {'username': 'Bob'}, expires_delta=access_token_expires) print(f'user --> token: {token}') to_decode = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # 解密 token print(f'token --> user {to_decode}')
案例
此案例结合上篇的
基于 PasswordBearer Token 的 OAuth2 认证
获取 Token
import uvicorn from typing import Optional from pydantic import BaseModel from datetime import datetime, timedelta from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt # token 使用 from passlib.context import CryptContext # 哈希密码 # openssl rand -hex 32 SECRET_KEY = "12b2e365a1b19051f115e46e8dfd7200e63510319a791bcb2dcf605626e1aa0c" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 fake_user_db = { # 模拟用户数据库 'Bob': {'username': 'Bob', 'hash_password': '$2b$12$Qm6i.pPxCM/Kc672T0GJZOr6Wnq2YkjZm.UMk1O9abq.8fx3fas52'}, # 明文 123456 'Mary': {'username': 'Mary', 'hash_password': '$2b$12$Qm6i.pPxCM/Kc672T0GJZOr6Wnq2YkjZm.UMk1O9abq.8fx3fas52'}, # 明文 123456 } class User(BaseModel): username: str hash_password: str class Token(BaseModel): access_token: str token_type: str = 'bearer' pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # 实例化 OAuth2PasswordBearer 类,指明请求 token 接口地址 oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/jwt/token') app = FastAPI() # 验证密码 def verify_password(plain_password: str, hash_password) -> bool: return pwd_context.verify(plain_password, hash_password) def get_password_hash(password: str): return pwd_context.hash(password) def get_user(db, username: str) -> User: if username in db: user_dict = db[username] return User(**user_dict) def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hash_password): return False return user def create_access_token(user: dict, expires_delta: Optional[timedelta] = None): to_encode = user.copy() # 浅拷贝:深拷贝父对象(一级目录),子对象(二级目录)不拷贝,还是引用 if expires_delta: # 能通过参数指定 过期时间 expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) # 加入过期时间 encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # 加密信息 得到 token return encoded_jwt @app.post('/jwt/token') # 获取 token 的接口具体方法 async def get_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user( # 获取用户对象 fake_db=fake_user_db, username=form_data.username, password=form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail='Incorrect username or password', headers={'WWW-Authenticate': 'Bearer'}) # 规范 access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( # 生成 Token user=user.dict(), expires_delta=access_token_expires ) return Token(access_token=access_token, token_type='bearer') if __name__ == '__main__': uvicorn.run(app)
获取当前用户信息
async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: # 解密 token 获取用户 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get('username') if username is None: raise credentials_exception token_data = username except JWTError: return credentials_exception user = get_user(fake_user_db, token_data) if user is None: raise credentials_exception return user @app.post('/jwt/me') async def read_users_me(current_user: User = Depends(get_current_user)): return current_user
获取当前用户信息
版权声明:《 【FastAPI】基于 Json Web Tokens 的认证 》为明妃原创文章,转载请注明出处!
最后编辑:2022-2-22 12:02:47