JWT를 이용한 FastAPI 사용자 인증 구현하기

2024. 11. 24. 10:00·파이썬/Fast API
반응형

소개

웹 애플리케이션에서 사용자 인증은 매우 중요한 부분입니다. 이번 글에서는 JWT(JSON Web Token)를 사용하여 FastAPI에서 안전한 사용자 인증 시스템을 구현하는 방법을 알아보겠습니다.

필요한 패키지 설치

pip install python-jose[cryptography]  # JWT 토큰 생성 및 검증
pip install passlib[bcrypt]            # 비밀번호 해싱
pip install python-multipart           # 폼 데이터 처리

보안 설정

# security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext

# 보안 관련 상수 정의
SECRET_KEY = "your-secret-key"  # 실제로는 환경 변수로 관리
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# 비밀번호 해싱 설정
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """
    일반 비밀번호와 해시된 비밀번호를 비교합니다.

    사용 예시:
    is_valid = verify_password("mypassword123", hashed_password)
    """
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    """
    비밀번호를 해시화합니다.

    사용 예시:
    hashed = get_password_hash("mypassword123")
    """
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    """
    JWT 토큰을 생성합니다.

    사용 예시:
    token = create_access_token({"sub": user.email})
    """
    to_encode = data.copy()
    # 토큰 만료 시간 설정
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)

    to_encode.update({"exp": expire})
    # JWT 토큰 생성
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

사용자 모델 정의

# models.py
from sqlalchemy import Boolean, Column, Integer, String
from database import Base

class User(Base):
    """사용자 데이터베이스 모델"""
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String, unique=True, index=True)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)
    is_admin = Column(Boolean, default=False)

Pydantic 스키마 정의

# schemas.py
from pydantic import BaseModel, EmailStr

class UserBase(BaseModel):
    email: EmailStr

class UserCreate(UserBase):
    password: str

class User(UserBase):
    id: int
    is_active: bool
    is_admin: bool

    class Config:
        orm_mode = True

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    email: Optional[str] = None

사용자 인증 기능 구현

# auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from jose import JWTError, jwt

from . import crud, models, schemas
from .security import SECRET_KEY, ALGORITHM

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: Session = Depends(get_db)
) -> models.User:
    """
    현재 인증된 사용자를 가져옵니다.

    사용 예시:
    @app.get("/users/me")
    async def read_users_me(current_user = Depends(get_current_user)):
        return current_user
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )

    try:
        # 토큰 디코드
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        email: str = payload.get("sub")
        if email is None:
            raise credentials_exception
        token_data = schemas.TokenData(email=email)
    except JWTError:
        raise credentials_exception

    # 사용자 조회
    user = crud.get_user_by_email(db, email=token_data.email)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(
    current_user: models.User = Depends(get_current_user)
) -> models.User:
    """
    활성화된 사용자만 가져옵니다.
    """
    if not current_user.is_active:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

API 엔드포인트 구현

# main.py
from datetime import timedelta
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session

from . import crud, models, schemas
from .auth import get_current_active_user
from .security import (
    ACCESS_TOKEN_EXPIRE_MINUTES,
    create_access_token,
    verify_password
)

app = FastAPI()

@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: Session = Depends(get_db)
):
    """
    로그인 API - 액세스 토큰을 발급합니다.

    사용 예시:
    curl -X POST "http://localhost:8000/token" \
         -H "Content-Type: application/x-www-form-urlencoded" \
         -d "username=user@example.com&password=password123"
    """
    # 사용자 인증
    user = crud.get_user_by_email(db, form_data.username)
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    # 토큰 생성
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.email}, expires_delta=access_token_expires
    )

    return {"access_token": access_token, "token_type": "bearer"}

@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    """
    새 사용자를 등록합니다.

    사용 예시:
    curl -X POST "http://localhost:8000/users/" \
         -H "Content-Type: application/json" \
         -d '{"email": "user@example.com", "password": "password123"}'
    """
    # 이메일 중복 체크
    db_user = crud.get_user_by_email(db, email=user.email)
    if db_user:
        raise HTTPException(
            status_code=400,
            detail="Email already registered"
        )
    return crud.create_user(db=db, user=user)

@app.get("/users/me/", response_model=schemas.User)
async def read_users_me(
    current_user: models.User = Depends(get_current_active_user)
):
    """
    현재 로그인한 사용자의 정보를 조회합니다.

    사용 예시:
    curl "http://localhost:8000/users/me" \
         -H "Authorization: Bearer {your_token}"
    """
    return current_user

권한 관리 구현

# permissions.py
from fastapi import HTTPException, status
from functools import wraps

def check_admin_permission(current_user: models.User):
    """
    관리자 권한을 확인합니다.
    """
    if not current_user.is_admin:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Not enough privileges"
        )

# 관리자 전용 API 예시
@app.get("/admin/users/", response_model=List[schemas.User])
async def read_all_users(
    current_user: models.User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    """
    모든 사용자 목록을 조회합니다 (관리자 전용).
    """
    check_admin_permission(current_user)
    return crud.get_users(db)

테스트 방법

# 1. 사용자 등록
curl -X POST "http://localhost:8000/users/" \
     -H "Content-Type: application/json" \
     -d '{"email": "test@example.com", "password": "password123"}'

# 2. 로그인하여 토큰 받기
curl -X POST "http://localhost:8000/token" \
     -H "Content-Type: application/x-www-form-urlencoded" \
     -d "username=test@example.com&password=password123"

# 3. 토큰을 사용하여 내 정보 조회
curl "http://localhost:8000/users/me" \
     -H "Authorization: Bearer {your_token}"

보안 모범 사례

비밀번호 요구사항 설정

def validate_password(password: str) -> bool:
    """
    비밀번호 요구사항을 검증합니다.
    - 최소 8자 이상
    - 대문자, 소문자, 숫자, 특수문자 포함
    """
    if len(password) < 8:
        return False
    if not re.search(r"[A-Z]", password):
        return False
    if not re.search(r"[a-z]", password):
        return False
    if not re.search(r"\d", password):
        return False
    if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
        return False
    return True

 

로그인 시도 제한

from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter

@app.post("/token")
@limiter.limit("5/minute")  # 1분당 5회로 제한
async def login_for_access_token():
    ...

마치며

이번 글에서는 FastAPI에서 JWT를 사용한 인증 시스템과 권한 관리를 구현하는 방법을 알아보았습니다. 실제 프로덕션 환경에서는 추가적인 보안 조치가 필요할 수 있으며, 환경 변수 관리와 에러 처리도 중요합니다.

저작자표시 비영리 변경금지 (새창열림)

'파이썬 > Fast API' 카테고리의 다른 글

FastAPI 애플리케이션 성능 최적화와 캐싱  (0) 2024.11.28
FastAPI 애플리케이션 Docker로 배포하기  (6) 2024.11.27
FastAPI 미들웨어와 CORS 설정  (0) 2024.11.26
FastAPI의 데이터베이스 연동 쉽게 이해하기  (5) 2024.11.23
FastAPI 시작하기 - 설치부터 첫 API 만들기까지  (2) 2024.11.21
'파이썬/Fast API' 카테고리의 다른 글
  • FastAPI 애플리케이션 Docker로 배포하기
  • FastAPI 미들웨어와 CORS 설정
  • FastAPI의 데이터베이스 연동 쉽게 이해하기
  • FastAPI 시작하기 - 설치부터 첫 API 만들기까지
코샵
코샵
나의 코딩 일기장
    반응형
  • 코샵
    끄적끄적 코딩 공방
    코샵
    • 분류 전체보기 (687) N
      • 상품 추천 (192) N
      • MongoDB (4)
      • 하드웨어 (12) N
      • 일기장 (4)
      • Unity (138)
        • Tip (41)
        • Project (1)
        • Design Pattern (8)
        • Firebase (6)
        • Asset (2)
      • 파이썬 (12)
        • Basic (41)
        • OpenCV (8)
        • Pandas (15)
        • PyQT (3)
        • SBC(Single Board Computer) (1)
        • 크롤링 (14)
        • Fast API (29)
        • Package (6)
      • Linux (4)
      • C# (97)
        • Algorithm (11)
        • Window (7)
      • TypeScript (50)
        • CSS (10)
      • Git (11)
      • SQL (5)
      • Flutter (10)
        • Tip (1)
      • System (1)
      • BaekJoon (6)
      • Portfolio (2)
      • MacOS (1)
      • 유틸리티 (1)
      • 서비스 (6)
      • 자동화 (3)
      • Hobby (10)
        • 물생활 (10)
        • 식집사 (0)
  • 인기 글

  • 태그

    긴유통기한우유
    unity
    devlife
    스마트스토어리뷰
    셀레니움
    카페24리뷰이관
    cv2
    Python
    유니티
    리스트
    파이썬
    스크립트 실행
    codingtips
    rtsp
    list
    programming101
    카페24리뷰
    쇼핑몰리뷰
    스크립트 실행 순서
    라떼우유
    리뷰관리
    learntocode
    상품 리뷰 크롤링
    믈레코비타멸균우유
    appdevelopment
    ipcamera
    codingcommunity
    C#
    리뷰이관
    programmerlife
  • 최근 글

  • hELLO· Designed By정상우.v4.10.3
코샵
JWT를 이용한 FastAPI 사용자 인증 구현하기
상단으로

티스토리툴바