파이썬/Fast API

FastAPI 애플리케이션 성능 최적화와 캐싱

코샵 2024. 11. 28. 10:25
반응형

소개

FastAPI는 이미 높은 성능을 제공하지만, 적절한 최적화와 캐싱 전략을 통해 더 나은 성능을 얻을 수 있습니다. 이번 글에서는 FastAPI 애플리케이션의 성능을 극대화하는 다양한 전략과 캐싱 구현 방법을 알아보겠습니다.

데이터베이스 최적화

SQLAlchemy 세션 관리

# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextmanager

class DatabaseManager:
    def __init__(self, url: str):
        self.engine = create_engine(
            url,
            pool_size=20,               # 동시 연결 수
            max_overflow=10,            # 추가 허용 연결 수
            pool_timeout=30,            # 연결 대기 시간
            pool_recycle=1800          # 연결 재사용 시간
        )
        self.SessionLocal = sessionmaker(
            bind=self.engine,
            autocommit=False,
            autoflush=False
        )

    @contextmanager
    def get_db(self) -> Session:
        """세션 컨텍스트 관리"""
        db = self.SessionLocal()
        try:
            yield db
        finally:
            db.close()

# 사용 예시
db_manager = DatabaseManager("postgresql://user:password@localhost/db")

async def get_items():
    with db_manager.get_db() as db:
        items = db.query(Item).all()
        return items

쿼리 최적화

# crud.py
from sqlalchemy import select
from sqlalchemy.orm import joinedload, selectinload

async def get_user_with_orders(db: Session, user_id: int):
    """
    N+1 문제를 해결하는 최적화된 쿼리
    """
    # 비효율적인 방식
    # user = db.query(User).get(user_id)
    # orders = user.orders  # 추가 쿼리 발생

    # 최적화된 방식
    user = (
        db.query(User)
        .options(selectinload(User.orders))
        .filter(User.id == user_id)
        .first()
    )
    return user

async def bulk_create_items(db: Session, items: List[dict]):
    """
    대량 데이터 삽입 최적화
    """
    db.bulk_insert_mappings(Item, items)
    db.commit()

Redis 캐싱 구현

Redis 설정

# cache.py
from fastapi import FastAPI
from redis import asyncio as aioredis
import json
from typing import Any, Optional

class RedisCache:
    def __init__(self, redis_url: str):
        self.redis = aioredis.from_url(redis_url)

    async def get(self, key: str) -> Optional[Any]:
        """
        캐시에서 데이터 조회
        """
        data = await self.redis.get(key)
        if data:
            return json.loads(data)
        return None

    async def set(
        self,
        key: str,
        value: Any,
        expire: int = 3600  # 1시간
    ):
        """
        데이터를 캐시에 저장
        """
        await self.redis.set(
            key,
            json.dumps(value),
            ex=expire
        )

    async def delete(self, key: str):
        """
        캐시 데이터 삭제
        """
        await self.redis.delete(key)

    async def clear(self, pattern: str = "*"):
        """
        패턴에 맞는 모든 캐시 삭제
        """
        keys = await self.redis.keys(pattern)
        if keys:
            await self.redis.delete(*keys)

# FastAPI 애플리케이션에 Redis 캐시 통합
cache = RedisCache("redis://localhost")

app = FastAPI()

@app.on_event("startup")
async def startup_event():
    # 애플리케이션 시작 시 Redis 연결
    app.state.cache = cache

@app.on_event("shutdown")
async def shutdown_event():
    # 애플리케이션 종료 시 Redis 연결 닫기
    await app.state.cache.redis.close()

캐시 데코레이터 구현

# decorators.py
from functools import wraps
from fastapi import Request
import hashlib
import pickle

def cache_response(expire: int = 3600):
    """
    API 응답을 캐시하는 데코레이터

    사용 예시:
    @app.get("/items/{item_id}")
    @cache_response(expire=300)  # 5분간 캐시
    async def read_item(item_id: int):
        return {"item_id": item_id}
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 요청에서 FastAPI 앱 가져오기
            request = next((arg for arg in args if isinstance(arg, Request)), None)
            if not request:
                return await func(*args, **kwargs)

            # 캐시 키 생성
            cache_key = _generate_cache_key(func, args, kwargs)

            # 캐시된 데이터 확인
            cached_data = await request.app.state.cache.get(cache_key)
            if cached_data:
                return cached_data

            # 새로운 데이터 생성 및 캐시
            response = await func(*args, **kwargs)
            await request.app.state.cache.set(cache_key, response, expire)

            return response
        return wrapper
    return decorator

def _generate_cache_key(func, args, kwargs) -> str:
    """
    고유한 캐시 키 생성
    """
    # 함수 이름과 인자들을 결합하여 해시 생성
    key_parts = [func.__name__]
    key_parts.extend([str(arg) for arg in args if not isinstance(arg, Request)])
    key_parts.extend([f"{k}:{v}" for k, v in sorted(kwargs.items())])

    key_string = "|".join(key_parts)
    return hashlib.md5(key_string.encode()).hexdigest()

API 응답 최적화

응답 압축

# main.py
from fastapi.middleware.gzip import GZipMiddleware

app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)

JSON 직렬화 최적화

# serializers.py
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
import orjson

class CustomJSONResponse(Response):
    """
    최적화된 JSON 직렬화
    """
    media_type = "application/json"

    def render(self, content: Any) -> bytes:
        return orjson.dumps(
            content,
            option=orjson.OPT_SERIALIZE_NUMPY | 
                   orjson.OPT_SERIALIZE_DATETIME
        )

# 사용 예시
@app.get("/items/", response_class=CustomJSONResponse)
async def read_items():
    return {"items": items}

비동기 처리 최적화

비동기 작업 처리

# tasks.py
import asyncio
from typing import List
import httpx

async def fetch_external_data(urls: List[str]):
    """
    여러 외부 API를 동시에 호출
    """
    async with httpx.AsyncClient() as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        return [r.json() for r in responses]

@app.get("/aggregate/")
async def aggregate_data():
    """
    여러 소스의 데이터를 병렬로 조회
    """
    urls = [
        "http://api1.example.com/data",
        "http://api2.example.com/data",
        "http://api3.example.com/data"
    ]
    data = await fetch_external_data(urls)
    return {"results": data}

메모리 캐싱

인메모리 캐시 구현

# memory_cache.py
from typing import Dict, Any
import time
from threading import Lock

class MemoryCache:
    def __init__(self):
        self._cache: Dict[str, Any] = {}
        self._expire_times: Dict[str, float] = {}
        self._lock = Lock()

    def get(self, key: str) -> Any:
        """
        캐시된 값 조회
        """
        with self._lock:
            if key not in self._cache:
                return None

            # 만료 시간 확인
            if time.time() > self._expire_times[key]:
                del self._cache[key]
                del self._expire_times[key]
                return None

            return self._cache[key]

    def set(self, key: str, value: Any, expire: int = 300):
        """
        값을 캐시에 저장
        """
        with self._lock:
            self._cache[key] = value
            self._expire_times[key] = time.time() + expire

    def clear(self):
        """
        캐시 초기화
        """
        with self._lock:
            self._cache.clear()
            self._expire_times.clear()

# 사용 예시
memory_cache = MemoryCache()

@app.get("/cached-data/{item_id}")
async def get_cached_data(item_id: int):
    cache_key = f"item_{item_id}"

    # 캐시 확인
    cached_data = memory_cache.get(cache_key)
    if cached_data:
        return cached_data

    # 새로운 데이터 생성
    data = await expensive_operation(item_id)
    memory_cache.set(cache_key, data, expire=300)

    return data

성능 모니터링

성능 메트릭 수집

# monitoring.py
from fastapi import FastAPI, Request
from prometheus_client import Counter, Histogram
import time

# 메트릭 정의
REQUEST_COUNT = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'http_request_duration_seconds',
    'HTTP request latency',
    ['method', 'endpoint']
)

app = FastAPI()

@app.middleware("http")
async def monitor_requests(request: Request, call_next):
    """
    요청 처리 시간 측정
    """
    start_time = time.time()

    response = await call_next(request)

    duration = time.time() - start_time
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()

    REQUEST_LATENCY.labels(
        method=request.method,
        endpoint=request.url.path
    ).observe(duration)

    return response

마치며

FastAPI 애플리케이션의 성능 최적화는 데이터베이스 쿼리 최적화, 효율적인 캐싱 전략, 비동기 처리 등 여러 측면에서 접근해야 합니다. 각 프로젝트의 요구사항과 부하 특성에 맞게 적절한 전략을 선택하고 적용하는 것이 중요합니다.