반응형
소개
웹 애플리케이션에서 사용자 인증은 매우 중요한 부분입니다. 이번 글에서는 JWT(JSON Web Token)를 사용하여 FastAPI에서 안전한 사용자 인증 시스템을 구현하는 방법을 알아보겠습니다.
필요한 패키지 설치
pip install python-jose[cryptography] # JWT 토큰 생성 및 검증
pip install passlib[bcrypt] # 비밀번호 해싱
pip install python-multipart # 폼 데이터 처리
보안 설정
# security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
# 보안 관련 상수 정의
SECRET_KEY = "your-secret-key" # 실제로는 환경 변수로 관리
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 비밀번호 해싱 설정
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
일반 비밀번호와 해시된 비밀번호를 비교합니다.
사용 예시:
is_valid = verify_password("mypassword123", hashed_password)
"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""
비밀번호를 해시화합니다.
사용 예시:
hashed = get_password_hash("mypassword123")
"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""
JWT 토큰을 생성합니다.
사용 예시:
token = create_access_token({"sub": user.email})
"""
to_encode = data.copy()
# 토큰 만료 시간 설정
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
# JWT 토큰 생성
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
사용자 모델 정의
# models.py
from sqlalchemy import Boolean, Column, Integer, String
from database import Base
class User(Base):
"""사용자 데이터베이스 모델"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
Pydantic 스키마 정의
# schemas.py
from pydantic import BaseModel, EmailStr
class UserBase(BaseModel):
email: EmailStr
class UserCreate(UserBase):
password: str
class User(UserBase):
id: int
is_active: bool
is_admin: bool
class Config:
orm_mode = True
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
email: Optional[str] = None
사용자 인증 기능 구현
# auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from jose import JWTError, jwt
from . import crud, models, schemas
from .security import SECRET_KEY, ALGORITHM
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> models.User:
"""
현재 인증된 사용자를 가져옵니다.
사용 예시:
@app.get("/users/me")
async def read_users_me(current_user = Depends(get_current_user)):
return current_user
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# 토큰 디코드
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = schemas.TokenData(email=email)
except JWTError:
raise credentials_exception
# 사용자 조회
user = crud.get_user_by_email(db, email=token_data.email)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: models.User = Depends(get_current_user)
) -> models.User:
"""
활성화된 사용자만 가져옵니다.
"""
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
API 엔드포인트 구현
# main.py
from datetime import timedelta
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from . import crud, models, schemas
from .auth import get_current_active_user
from .security import (
ACCESS_TOKEN_EXPIRE_MINUTES,
create_access_token,
verify_password
)
app = FastAPI()
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
"""
로그인 API - 액세스 토큰을 발급합니다.
사용 예시:
curl -X POST "http://localhost:8000/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=user@example.com&password=password123"
"""
# 사용자 인증
user = crud.get_user_by_email(db, form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# 토큰 생성
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
"""
새 사용자를 등록합니다.
사용 예시:
curl -X POST "http://localhost:8000/users/" \
-H "Content-Type: application/json" \
-d '{"email": "user@example.com", "password": "password123"}'
"""
# 이메일 중복 체크
db_user = crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(
status_code=400,
detail="Email already registered"
)
return crud.create_user(db=db, user=user)
@app.get("/users/me/", response_model=schemas.User)
async def read_users_me(
current_user: models.User = Depends(get_current_active_user)
):
"""
현재 로그인한 사용자의 정보를 조회합니다.
사용 예시:
curl "http://localhost:8000/users/me" \
-H "Authorization: Bearer {your_token}"
"""
return current_user
권한 관리 구현
# permissions.py
from fastapi import HTTPException, status
from functools import wraps
def check_admin_permission(current_user: models.User):
"""
관리자 권한을 확인합니다.
"""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough privileges"
)
# 관리자 전용 API 예시
@app.get("/admin/users/", response_model=List[schemas.User])
async def read_all_users(
current_user: models.User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
"""
모든 사용자 목록을 조회합니다 (관리자 전용).
"""
check_admin_permission(current_user)
return crud.get_users(db)
테스트 방법
# 1. 사용자 등록
curl -X POST "http://localhost:8000/users/" \
-H "Content-Type: application/json" \
-d '{"email": "test@example.com", "password": "password123"}'
# 2. 로그인하여 토큰 받기
curl -X POST "http://localhost:8000/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=test@example.com&password=password123"
# 3. 토큰을 사용하여 내 정보 조회
curl "http://localhost:8000/users/me" \
-H "Authorization: Bearer {your_token}"
보안 모범 사례
비밀번호 요구사항 설정
def validate_password(password: str) -> bool:
"""
비밀번호 요구사항을 검증합니다.
- 최소 8자 이상
- 대문자, 소문자, 숫자, 특수문자 포함
"""
if len(password) < 8:
return False
if not re.search(r"[A-Z]", password):
return False
if not re.search(r"[a-z]", password):
return False
if not re.search(r"\d", password):
return False
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
return False
return True
로그인 시도 제한
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
@app.post("/token")
@limiter.limit("5/minute") # 1분당 5회로 제한
async def login_for_access_token():
...
마치며
이번 글에서는 FastAPI에서 JWT를 사용한 인증 시스템과 권한 관리를 구현하는 방법을 알아보았습니다. 실제 프로덕션 환경에서는 추가적인 보안 조치가 필요할 수 있으며, 환경 변수 관리와 에러 처리도 중요합니다.
'파이썬 > Fast API' 카테고리의 다른 글
FastAPI 애플리케이션 성능 최적화와 캐싱 (0) | 2024.11.28 |
---|---|
FastAPI 애플리케이션 Docker로 배포하기 (6) | 2024.11.27 |
FastAPI 미들웨어와 CORS 설정 (0) | 2024.11.26 |
FastAPI의 데이터베이스 연동 쉽게 이해하기 (5) | 2024.11.23 |
FastAPI 시작하기 - 설치부터 첫 API 만들기까지 (2) | 2024.11.21 |