파이썬/Fast API

JWT를 이용한 FastAPI 사용자 인증 구현하기

코샵 2024. 11. 24. 10:00
반응형

소개

웹 애플리케이션에서 사용자 인증은 매우 중요한 부분입니다. 이번 글에서는 JWT(JSON Web Token)를 사용하여 FastAPI에서 안전한 사용자 인증 시스템을 구현하는 방법을 알아보겠습니다.

필요한 패키지 설치

pip install python-jose[cryptography]  # JWT 토큰 생성 및 검증
pip install passlib[bcrypt]            # 비밀번호 해싱
pip install python-multipart           # 폼 데이터 처리

보안 설정

# security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext

# 보안 관련 상수 정의
SECRET_KEY = "your-secret-key"  # 실제로는 환경 변수로 관리
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 비밀번호 해싱 설정
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """
    일반 비밀번호와 해시된 비밀번호를 비교합니다.

    사용 예시:
    is_valid = verify_password("mypassword123", hashed_password)
    """
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    """
    비밀번호를 해시화합니다.

    사용 예시:
    hashed = get_password_hash("mypassword123")
    """
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    """
    JWT 토큰을 생성합니다.

    사용 예시:
    token = create_access_token({"sub": user.email})
    """
    to_encode = data.copy()
    # 토큰 만료 시간 설정
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)

    to_encode.update({"exp": expire})
    # JWT 토큰 생성
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

사용자 모델 정의

# models.py
from sqlalchemy import Boolean, Column, Integer, String
from database import Base

class User(Base):
    """사용자 데이터베이스 모델"""
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)
    is_admin = Column(Boolean, default=False)

Pydantic 스키마 정의

# schemas.py
from pydantic import BaseModel, EmailStr

class UserBase(BaseModel):
    email: EmailStr

class UserCreate(UserBase):
    password: str

class User(UserBase):
    id: int
    is_active: bool
    is_admin: bool

    class Config:
        orm_mode = True

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    email: Optional[str] = None

사용자 인증 기능 구현

# auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from jose import JWTError, jwt

from . import crud, models, schemas
from .security import SECRET_KEY, ALGORITHM

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db)
) -> models.User:
    """
    현재 인증된 사용자를 가져옵니다.

    사용 예시:
    @app.get("/users/me")
    async def read_users_me(current_user = Depends(get_current_user)):
        return current_user
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        # 토큰 디코드
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        email: str = payload.get("sub")
        if email is None:
            raise credentials_exception
        token_data = schemas.TokenData(email=email)
    except JWTError:
        raise credentials_exception

    # 사용자 조회
    user = crud.get_user_by_email(db, email=token_data.email)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(
    current_user: models.User = Depends(get_current_user)
) -> models.User:
    """
    활성화된 사용자만 가져옵니다.
    """
    if not current_user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

API 엔드포인트 구현

# main.py
from datetime import timedelta
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .auth import get_current_active_user
from .security import (
    ACCESS_TOKEN_EXPIRE_MINUTES,
    create_access_token,
    verify_password
)

app = FastAPI()

@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: Session = Depends(get_db)
):
    """
    로그인 API - 액세스 토큰을 발급합니다.

    사용 예시:
    curl -X POST "http://localhost:8000/token" \
         -H "Content-Type: application/x-www-form-urlencoded" \
         -d "username=user@example.com&password=password123"
    """
    # 사용자 인증
    user = crud.get_user_by_email(db, form_data.username)
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    # 토큰 생성
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.email}, expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}

@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    """
    새 사용자를 등록합니다.

    사용 예시:
    curl -X POST "http://localhost:8000/users/" \
         -H "Content-Type: application/json" \
         -d '{"email": "user@example.com", "password": "password123"}'
    """
    # 이메일 중복 체크
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(
            status_code=400,
            detail="Email already registered"
        )
    return crud.create_user(db=db, user=user)

@app.get("/users/me/", response_model=schemas.User)
async def read_users_me(
    current_user: models.User = Depends(get_current_active_user)
):
    """
    현재 로그인한 사용자의 정보를 조회합니다.

    사용 예시:
    curl "http://localhost:8000/users/me" \
         -H "Authorization: Bearer {your_token}"
    """
    return current_user

권한 관리 구현

# permissions.py
from fastapi import HTTPException, status
from functools import wraps

def check_admin_permission(current_user: models.User):
    """
    관리자 권한을 확인합니다.
    """
    if not current_user.is_admin:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Not enough privileges"
        )

# 관리자 전용 API 예시
@app.get("/admin/users/", response_model=List[schemas.User])
async def read_all_users(
    current_user: models.User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """
    모든 사용자 목록을 조회합니다 (관리자 전용).
    """
    check_admin_permission(current_user)
    return crud.get_users(db)

테스트 방법

# 1. 사용자 등록
curl -X POST "http://localhost:8000/users/" \
     -H "Content-Type: application/json" \
     -d '{"email": "test@example.com", "password": "password123"}'

# 2. 로그인하여 토큰 받기
curl -X POST "http://localhost:8000/token" \
     -H "Content-Type: application/x-www-form-urlencoded" \
     -d "username=test@example.com&password=password123"

# 3. 토큰을 사용하여 내 정보 조회
curl "http://localhost:8000/users/me" \
     -H "Authorization: Bearer {your_token}"

보안 모범 사례

비밀번호 요구사항 설정

def validate_password(password: str) -> bool:
    """
    비밀번호 요구사항을 검증합니다.
    - 최소 8자 이상
    - 대문자, 소문자, 숫자, 특수문자 포함
    """
    if len(password) < 8:
        return False
    if not re.search(r"[A-Z]", password):
        return False
    if not re.search(r"[a-z]", password):
        return False
    if not re.search(r"\d", password):
        return False
    if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
        return False
    return True

 

로그인 시도 제한

from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter

@app.post("/token")
@limiter.limit("5/minute")  # 1분당 5회로 제한
async def login_for_access_token():
    ...

마치며

이번 글에서는 FastAPI에서 JWT를 사용한 인증 시스템과 권한 관리를 구현하는 방법을 알아보았습니다. 실제 프로덕션 환경에서는 추가적인 보안 조치가 필요할 수 있으며, 환경 변수 관리와 에러 처리도 중요합니다.