【FastAPI】基于 Json Web Tokens 的认证

什么是 Json Web Tokens(JWT)

请求流程

首先 浏览器服务器发送请求得到 token

然后 浏览器每次请求都自动在头部带上token

mark

token 的组成

token 是一些信息(用户名,过期时间 … )的加密结果,加密的秘钥保存在 服务器,因此只要 秘钥 不被泄漏,就认为是安全的。

FastAPI 的 JWT

哈希密码

把用户的明文密码加密保存在数据库,即使密码泄漏,也不知道用户真正的密码

推荐的算法是 「Bcrypt」 :pip3 install passlib[bcrypt]

from passlib.context import CryptContext

# 创建 哈希 上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 校验 密文 明文
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

# 加密明文
def get_password_hash(password):
    return pwd_context.hash(password)

if __name__ == '__main__':
    password_hash = get_password_hash('123456')  # 加密结果是动态的 每次不一样,但是验证是一样的
    print(password_hash)
    print(verify_password('123456', password_hash))

mark

token 生成 & 校验 逻辑说明

需要 加密使用的 秘钥 加密算法 token过期时间

# openssl rand -hex 32
SECRET_KEY = "12b2e365a1b19051f115e46e8dfd7200e63510319a791bcb2dcf605626e1aa0c"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

token 生成 与 校验

安装模块:pip install python-jose[cryptography]

from typing import Optional
from jose import JWTError, jwt
from datetime import datetime, timedelta

# openssl rand -hex 32
SECRET_KEY = "12b2e365a1b19051f115e46e8dfd7200e63510319a791bcb2dcf605626e1aa0c"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def create_access_token(user: dict, expires_delta: Optional[timedelta] = None):
    to_encode = user.copy()  # 浅拷贝:深拷贝父对象(一级目录),子对象(二级目录)不拷贝,还是引用
    if expires_delta:  # 能通过参数指定 过期时间
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})  # 加入过期时间
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)  # 加密信息 得到 token
    return encoded_jwt

if __name__ == '__main__':
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)  # 生成时间格式 0:30:00
    token = create_access_token( # 生成 Token
        {'username': 'Bob'}, expires_delta=access_token_expires)
    print(f'user --> token: {token}')
    to_decode = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])  # 解密 token
    print(f'token --> user {to_decode}')

mark

案例

此案例结合上篇的 基于 PasswordBearer Token 的 OAuth2 认证

获取 Token

import uvicorn
from typing import Optional
from pydantic import BaseModel
from datetime import datetime, timedelta
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt  # token 使用
from passlib.context import CryptContext  # 哈希密码

# openssl rand -hex 32
SECRET_KEY = "12b2e365a1b19051f115e46e8dfd7200e63510319a791bcb2dcf605626e1aa0c"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

fake_user_db = {  # 模拟用户数据库
    'Bob': {'username': 'Bob',
            'hash_password': '$2b$12$Qm6i.pPxCM/Kc672T0GJZOr6Wnq2YkjZm.UMk1O9abq.8fx3fas52'},  # 明文 123456
    'Mary': {'username': 'Mary',
             'hash_password': '$2b$12$Qm6i.pPxCM/Kc672T0GJZOr6Wnq2YkjZm.UMk1O9abq.8fx3fas52'},  # 明文 123456
}

class User(BaseModel):
    username: str
    hash_password: str

class Token(BaseModel):
    access_token: str
    token_type: str = 'bearer'

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 实例化 OAuth2PasswordBearer 类,指明请求 token 接口地址
oauth2_scheme = OAuth2PasswordBearer(tokenUrl='/jwt/token')
app = FastAPI()

# 验证密码
def verify_password(plain_password: str, hash_password) -> bool:
    return pwd_context.verify(plain_password, hash_password)

def get_password_hash(password: str):
    return pwd_context.hash(password)

def get_user(db, username: str) -> User:
    if username in db:
        user_dict = db[username]
        return User(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hash_password):
        return False
    return user

def create_access_token(user: dict, expires_delta: Optional[timedelta] = None):
    to_encode = user.copy()  # 浅拷贝:深拷贝父对象(一级目录),子对象(二级目录)不拷贝,还是引用
    if expires_delta:  # 能通过参数指定 过期时间
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})  # 加入过期时间
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)  # 加密信息 得到 token
    return encoded_jwt

@app.post('/jwt/token')  # 获取 token 的接口具体方法
async def get_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(  # 获取用户对象
        fake_db=fake_user_db,
        username=form_data.username,
        password=form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail='Incorrect username or password',
            headers={'WWW-Authenticate': 'Bearer'})  # 规范
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(  # 生成 Token
        user=user.dict(), expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type='bearer')

if __name__ == '__main__':
    uvicorn.run(app)

mark

获取当前用户信息

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        # 解密 token 获取用户
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get('username')
        if username is None:
            raise credentials_exception
        token_data = username
    except JWTError:
        return credentials_exception
    user = get_user(fake_user_db, token_data)
    if user is None:
        raise credentials_exception
    return user

@app.post('/jwt/me')
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

获取Token
mark

获取当前用户信息

mark

发表评论 / Comment

用心评论~