Безопасность — наше всё. Особенно, когда речь идёт о веб-приложении, где пользователи регистрируются, авторизуются и хранят свои данные. В этой статье разберёмся с тем, как защитить ваше API с помощью JWT-токенов, настроить OAuth2 и реализовать регистрацию с авторизацией.

Почему JWT?

JWT (JSON Web Token) — это компактный и безопасный способ передачи информации между сторонами в виде JSON-объекта. Он содержит три части:

  • Header (заголовок) — тип токена и алгоритм подписи.
  • Payload (полезная нагрузка) — данные пользователя, такие как user_id.
  • Signature (подпись) — для защиты от подделки.

Устанавливаем зависимости 📦

Для работы с JWT нам понадобятся дополнительные библиотеки:

pip install python-jose[cryptography] passlib bcrypt
  • python-jose — для создания и проверки JWT.
  • passlib — для безопасного хеширования паролей.
  • bcrypt — хеширование паролей по алгоритму bcrypt.

Настраиваем модели пользователей 👤

Продолжим работать с нашей базой данных и добавим таблицу для пользователей с полями для логина и пароля.

from sqlalchemy import Column, Integer, String
from database import Base

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)  # Сохраняем хешированный пароль

Добавили поле hashed_password — пароль будет храниться в базе в зашифрованном виде. Никогда не храните пароли в открытом виде!

Хеширование паролей 🔒

Перед сохранением пароля пользователя, его нужно хешировать. Для этого создадим пару функций:

from passlib.context import CryptContext

# Настраиваем контекст для хеширования паролей
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# Функция для хеширования пароля
def get_password_hash(password):
    return pwd_context.hash(password)

# Функция для проверки пароля
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

Алгоритм bcrypt медленнее других хеш-функций (например, MD5 или SHA). Это хорошо, потому что усложняет атаку на базу данных паролей.

Генерация и проверка JWT-токенов 🛡️

Создадим функции для работы с JWT:

from jose import JWTError, jwt
from datetime import datetime, timedelta

# Секретный ключ для подписи токенов
SECRET_KEY = "your_secret_key"
ALGORITHM = "HS256"  # Алгоритм подписи
ACCESS_TOKEN_EXPIRE_MINUTES = 30  # Время жизни токена

# Функция для создания JWT-токена
def create_access_token(data: dict, expires_delta: timedelta):
    to_encode = data.copy()
    expire = datetime.utcnow() + expires_delta
    to_encode.update({"exp": expire})  # Добавляем время истечения
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# Функция для проверки токена
def decode_token(token: str):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except JWTError:
        return None  # Если токен недействителен или истёк

Регистрация пользователей 📝

Добавим маршрут для регистрации:

from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy.orm import Session
from database import SessionLocal
from models import User
from auth import get_password_hash

app = FastAPI()

# Зависимость для получения сессии БД
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# Регистрация пользователя
@app.post("/register/")
def register_user(username: str, email: str, password: str, db: Session = Depends(get_db)):
    existing_user = db.query(User).filter(User.username == username).first()
    if existing_user:
        raise HTTPException(status_code=400, detail="Пользователь уже существует")

    hashed_password = get_password_hash(password)
    new_user = User(username=username, email=email, hashed_password=hashed_password)
    
    db.add(new_user)
    db.commit()
    db.refresh(new_user)
    return {"msg": "Пользователь успешно зарегистрирован"}

Проверяем, существует ли пользователь с таким логином. Если нет — хешируем пароль и создаём нового пользователя.

Авторизация и защита маршрутов 🔐

Добавим маршрут для логина и защиты маршрута:

from fastapi.security import OAuth2PasswordBearer
from fastapi import Depends, HTTPException
from jose import JWTError, jwt

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Авторизация пользователя и создание токена
@app.post("/token")
def login(username: str, password: str, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.username == username).first()
    if not user or not verify_password(password, user.hashed_password):
        raise HTTPException(status_code=401, detail="Неверный логин или пароль")

    token_data = {"sub": user.username}
    access_token = create_access_token(data=token_data, expires_delta=timedelta(minutes=30))
    return {"access_token": access_token, "token_type": "bearer"}

# Защищённый маршрут
@app.get("/protected/")
def protected_route(token: str = Depends(oauth2_scheme)):
    payload = decode_token(token)
    if payload is None:
        raise HTTPException(status_code=401, detail="Неверный токен или срок действия истёк")

    return {"msg": f"Добро пожаловать, {payload['sub']}!"}

Заключение: безопасность в кармане 🛡️

Теперь вы можете создавать API с регистрацией, авторизацией и защищёнными маршрутами. JWT-токены надёжно хранят информацию о пользователе, а FastAPI позволяет легко интегрировать их в приложение.