Безопасность — наше всё. Особенно, когда речь идёт о веб-приложении, где пользователи регистрируются, авторизуются и хранят свои данные. В этой статье разберёмся с тем, как защитить ваше API с помощью JWT-токенов, настроить OAuth2 и реализовать регистрацию с авторизацией.
Почему JWT?
JWT (JSON Web Token) — это компактный и безопасный способ передачи информации между сторонами в виде JSON-объекта. Он содержит три части:
- Header (заголовок) — тип токена и алгоритм подписи.
- Payload (полезная нагрузка) — данные пользователя, такие как user_id.
- Signature (подпись) — для защиты от подделки.
Устанавливаем зависимости 📦
Для работы с JWT нам понадобятся дополнительные библиотеки:
pip install python-jose[cryptography] passlib bcrypt
- python-jose — для создания и проверки JWT.
- passlib — для безопасного хеширования паролей.
- bcrypt — хеширование паролей по алгоритму bcrypt.
Настраиваем модели пользователей 👤
Продолжим работать с нашей базой данных и добавим таблицу для пользователей с полями для логина и пароля.
from sqlalchemy import Column, Integer, String
from database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String) # Сохраняем хешированный пароль
Хеширование паролей 🔒
Перед сохранением пароля пользователя, его нужно хешировать. Для этого создадим пару функций:
from passlib.context import CryptContext
# Настраиваем контекст для хеширования паролей
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# Функция для хеширования пароля
def get_password_hash(password):
return pwd_context.hash(password)
# Функция для проверки пароля
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
Генерация и проверка JWT-токенов 🛡️
Создадим функции для работы с JWT:
from jose import JWTError, jwt
from datetime import datetime, timedelta
# Секретный ключ для подписи токенов
SECRET_KEY = "your_secret_key"
ALGORITHM = "HS256" # Алгоритм подписи
ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Время жизни токена
# Функция для создания JWT-токена
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire}) # Добавляем время истечения
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# Функция для проверки токена
def decode_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None # Если токен недействителен или истёк
Регистрация пользователей 📝
Добавим маршрут для регистрации:
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.orm import Session
from database import SessionLocal
from models import User
from auth import get_password_hash
app = FastAPI()
# Зависимость для получения сессии БД
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# Регистрация пользователя
@app.post("/register/")
def register_user(username: str, email: str, password: str, db: Session = Depends(get_db)):
existing_user = db.query(User).filter(User.username == username).first()
if existing_user:
raise HTTPException(status_code=400, detail="Пользователь уже существует")
hashed_password = get_password_hash(password)
new_user = User(username=username, email=email, hashed_password=hashed_password)
db.add(new_user)
db.commit()
db.refresh(new_user)
return {"msg": "Пользователь успешно зарегистрирован"}
Авторизация и защита маршрутов 🔐
Добавим маршрут для логина и защиты маршрута:
from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends, HTTPException
from jose import JWTError, jwt
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Авторизация пользователя и создание токена
@app.post("/token")
def login(username: str, password: str, db: Session = Depends(get_db)):
user = db.query(User).filter(User.username == username).first()
if not user or not verify_password(password, user.hashed_password):
raise HTTPException(status_code=401, detail="Неверный логин или пароль")
token_data = {"sub": user.username}
access_token = create_access_token(data=token_data, expires_delta=timedelta(minutes=30))
return {"access_token": access_token, "token_type": "bearer"}
# Защищённый маршрут
@app.get("/protected/")
def protected_route(token: str = Depends(oauth2_scheme)):
payload = decode_token(token)
if payload is None:
raise HTTPException(status_code=401, detail="Неверный токен или срок действия истёк")
return {"msg": f"Добро пожаловать, {payload['sub']}!"}
Заключение: безопасность в кармане 🛡️
Теперь вы можете создавать API с регистрацией, авторизацией и защищёнными маршрутами. JWT-токены надёжно хранят информацию о пользователе, а FastAPI позволяет легко интегрировать их в приложение.