本文阐述了如何基于FastAPI
框架实现 OAuth2
用户认证,其中使用哈希算法对密码进行了加密,使用 JWT
持有令牌。
附带完整的代码,避免大家再次踩坑。
关于 OAuth2
OAuth
是一个关于授权(authorization
)的开放网络标准,在全世界得到广泛应用。比如:微信登录、Facebook,Google,Twitter,GitHub等。
OAuth2
规范要求使用密码流时,客户端或用户必须以表单数据形式发送 username
和 password
字段。这两个字段必须命名为 username
和 password
,不能使用 user-name
或 email
等其它名称。
该规范要求必须以表单数据形式发送 username 和 password,因此,不能使用 JSON 对象。
当然,前端仍可以显示终端用户所需的名称,数据库模型也可以使用自己定义的名称。
关于OAuth的更多知识,您可以参考:理解OAuth2.0
关于 JWT
JWT
即JSON 网络令牌(JSON Web Tokens
)是目前最流行的跨域认证解决方案。它在服务端将用户信息进行签名(如果有保密信息也可以加密后再签名),这样可以防止它被篡改。
每次客户端提交请求时,都附带 JWT
内容,这样服务端可以直接读取用户信息,而不用必须从服务器端的会话中获取。
关于JWT的更多知识,可以参考:JSON Web Token 入门教程
安装依赖
# 安装 PyJWT,在 Python 中生成和校验 JWT 令牌
pip install pyjwt
# Passlib 是处理密码哈希的 Python 包,支持很多安全哈希算法及配套工具。
# 本教程推荐的算法是 Bcrypt。
pip install passlib[bcrypt]
准备用户数据库
这里准备了用户数据和处理用户对象的简单方法,没有真正链接数据库。
from fastapi import Depends, FastAPI,HTTPException,status
from fastapi.security import OAuth2PasswordBearer,OAuth2PasswordRequestForm
from pydantic import BaseModel
from passlib.context import CryptContext
from datetime import datetime, timedelta, timezone
import jwt
from jwt.exceptions import InvalidTokenError
from typing import Union
from typing import Annotated
# 模仿用户数据库
fake_users_db = {
"liu": {
"username": "liu",
"full_name": "Jack Liu",
"email": "liupras@gmail.com",
"hashed_password": "$2b$12$XMT2KGR.3pBUszKSl91I6uJDWVZIncZMyqgXzH1KnWqZcPZ/k5pLu", #12345678
"disabled": False,
},
"wang": {
"username": "wang",
"full_name": "Errin Wang",
"email": "56008507@qq.com",
"hashed_password": "$2b$12$WjyqXlyP/TCyysi0HwLWGenjP668dBswX39aKJzByZTlTDZ9kD.5e", #23456789
"disabled": True,
},
}
# Token实体
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
# 用户实体
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
关于 **user_dict
直接把 user_dict 的键与值当作关键字参数传递,等效于:
UserInDB(
username = user_dict["username"],
email = user_dict["email"],
full_name = user_dict["full_name"],
disabled = user_dict["disabled"],
hashed_password = user_dict["hashed_password"],
)
处理用户密码和token/令牌
在本文中,使用 bcrypt
做密码哈希,使用 JWT
承载token/令牌。
在这里,服务端对用户信息做签名并发给客户端,客户端每次访问时都需要携带该令牌。
# 密钥。用于JWT签名。
SECRET_KEY = "09d25d094faa6ca2556c818155b7a9563b93f7099f6f0f4caa6cf63b88e8d1e7"
'''
注意,不要使用本例所示的密钥,因为它不安全。
'''
# 对JWT编码解码的算法。JWT不加密,任何人都能用它恢复原始信息。
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 使用bcrypt加密密码:每次加密都会生成不同的哈希值。
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 校验密码:校验接收的密码是否匹配存储的哈希值。
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# 加密密码
def get_password_hash(password):
'''
使用hash加密后,即便是数据库被盗,窃贼无法获取用户的明文密码,得到的只是哈希值。
哈希是指把特定内容(本例中为密码)转换为乱码形式的字节序列(其实就是字符串),但这个乱码无法转换回传入的密码。
'''
return pwd_context.hash(password)
# 认证用户
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# 生成JWT
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 使用 OAuth2 的 Password 流以及 Bearer 令牌(Token)。
# tokenUrl="token" 指向的是暂未创建的相对 URL token。这个相对 URL 相当于 ./token。
# 此设置将会要求客户端把 username 与password 发送至 API 中指定的 URL:http://127.0.0.1:8000/token 。
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 根据token获取当前登录的用户信息
# 该函数接收 str 类型的令牌,并返回 Pydantic 的 User 模型
async def get_current_user(token: str = Depends(oauth2_scheme)):
'''
安全和依赖注入的代码只需要写一次。各个端点可以使用同一个安全系统。
'''
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# 在JWT 规范中,sub 键的值是令牌的主题。
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
# 获取当前登录用户信息,并检查是否禁用
async def get_current_active_user(current_user: User = Depends(get_current_user)):
'''
在端点中,只有当用户存在、通过身份验证、且状态为激活时,才能获得该用户信息。
'''
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
关于hash/哈希
哈希是指:将指定内容(本例中为密码)转换为形似乱码的字节序列(即:字符串)。
每次传入完全相同的内容(比如,完全相同的密码)时,得到的都是完全相同的乱码,但这个乱码无法转换回传入的密码。
为什么使用密码哈希?
原因很简单:假如数据库被盗,窃贼无法获取用户的明文密码,得到的只是哈希值。这样一来,窃贼就无法在其它应用中使用窃取的密码。事实上,很多用户在所有系统中都使用相同的密码,风险超大。
实现登录和获取用户信息接口
# 创建一个FastAPI实例
app = FastAPI()
# 登录方法
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends())-> Token:
'''
OAuth2PasswordRequestForm 是用以下几项内容声明表单请求体的类依赖项:
username
password
scope、grant_type、client_id等可选字段。
'''
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
# 响应返回的内容应该包含 token_type。本例中用的是BearerToken,因此, Token 类型应为bearer。
return Token(access_token=access_token, token_type="bearer")
# 获取用户信息
@app.get("/users/me")
async def read_users_me(current_user: Annotated[User, Depends(get_current_active_user)]):
'''
此处把 current_user 的类型声明为 Pydantic 的 User 模型。
这有助于在函数内部使用代码补全和类型检查。
get_current_user 依赖项从子依赖项 oauth2_scheme 中接收 str 类型的 toke。
'''
return current_user
# 测试用户是否登录
@app.get("/users/me/items")
async def read_own_items(current_user: Annotated[User, Depends(get_current_active_user)]):
'''
Depends 在依赖注入系统中处理安全机制。
FastAPI 校验请求中的 Authorization 请求头,核对请求头的值是不是由 Bearer + 令牌组成, 并返回令牌字符串(str)。
如果没有找到 Authorization 请求头,或请求头的值不是 Bearer + 令牌。FastAPI 直接返回 401 错误状态码(UNAUTHORIZED)。
'''
return [{"item_id": "Foo", "owner": current_user.username}]
关于 OAuth2PasswordRequestForm
OAuth2PasswordRequestForm 是用以下几项内容声明表单请求体的类依赖项:
username
password
scope、grant_type、client_id等可选字段。
在 /token
路径操作中,用 Depends
把该类作为依赖项。由 FastAPI
自动注入。
启动程序
if __name__ == "__main__":
import uvicorn
# 交互式API文档地址:
# http://127.0.0.1:8000/docs/
# http://127.0.0.1:8000/redoc/
uvicorn.run(app, host="0.0.0.0", port=8000)
如果在 VS Code
中启动,启动成功后会出现下面的提示:
测试效果
打开 API 文档:http://127.0.0.1:8000/docs
身份验证
点击 Authorize
按钮。使用以下账号密码:
用户名:liu
密码:12345678
然后点击 Authorize
按钮:
如果登录成功,则会返回登录信息:
获取当前登录用户信息
接下来,点击 /users/me -> Try it out ,再点击 Execute 后,应该返回已登录用户的信息:
用未激活用户测试
测试未激活用户,输入以下信息,进行身份验证:
用户名:wang
密码:23456789
然后,执行 /users/me 路径的 GET 操作,显示下列未激活用户错误信息:
{
"detail": "Inactive user"
}
总结
至此,您可以使用 OAuth2
和 JWT
等标准配置安全的 FastAPI
应用。
🪐祝好运🪐
查看完整代码
参考