파이썬/Fast API
FastAPI 애플리케이션 성능 최적화와 캐싱
코샵
2024. 11. 28. 10:25
반응형
소개
FastAPI는 이미 높은 성능을 제공하지만, 적절한 최적화와 캐싱 전략을 통해 더 나은 성능을 얻을 수 있습니다. 이번 글에서는 FastAPI 애플리케이션의 성능을 극대화하는 다양한 전략과 캐싱 구현 방법을 알아보겠습니다.
데이터베이스 최적화
SQLAlchemy 세션 관리
# database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextmanager
class DatabaseManager:
def __init__(self, url: str):
self.engine = create_engine(
url,
pool_size=20, # 동시 연결 수
max_overflow=10, # 추가 허용 연결 수
pool_timeout=30, # 연결 대기 시간
pool_recycle=1800 # 연결 재사용 시간
)
self.SessionLocal = sessionmaker(
bind=self.engine,
autocommit=False,
autoflush=False
)
@contextmanager
def get_db(self) -> Session:
"""세션 컨텍스트 관리"""
db = self.SessionLocal()
try:
yield db
finally:
db.close()
# 사용 예시
db_manager = DatabaseManager("postgresql://user:password@localhost/db")
async def get_items():
with db_manager.get_db() as db:
items = db.query(Item).all()
return items
쿼리 최적화
# crud.py
from sqlalchemy import select
from sqlalchemy.orm import joinedload, selectinload
async def get_user_with_orders(db: Session, user_id: int):
"""
N+1 문제를 해결하는 최적화된 쿼리
"""
# 비효율적인 방식
# user = db.query(User).get(user_id)
# orders = user.orders # 추가 쿼리 발생
# 최적화된 방식
user = (
db.query(User)
.options(selectinload(User.orders))
.filter(User.id == user_id)
.first()
)
return user
async def bulk_create_items(db: Session, items: List[dict]):
"""
대량 데이터 삽입 최적화
"""
db.bulk_insert_mappings(Item, items)
db.commit()
Redis 캐싱 구현
Redis 설정
# cache.py
from fastapi import FastAPI
from redis import asyncio as aioredis
import json
from typing import Any, Optional
class RedisCache:
def __init__(self, redis_url: str):
self.redis = aioredis.from_url(redis_url)
async def get(self, key: str) -> Optional[Any]:
"""
캐시에서 데이터 조회
"""
data = await self.redis.get(key)
if data:
return json.loads(data)
return None
async def set(
self,
key: str,
value: Any,
expire: int = 3600 # 1시간
):
"""
데이터를 캐시에 저장
"""
await self.redis.set(
key,
json.dumps(value),
ex=expire
)
async def delete(self, key: str):
"""
캐시 데이터 삭제
"""
await self.redis.delete(key)
async def clear(self, pattern: str = "*"):
"""
패턴에 맞는 모든 캐시 삭제
"""
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
# FastAPI 애플리케이션에 Redis 캐시 통합
cache = RedisCache("redis://localhost")
app = FastAPI()
@app.on_event("startup")
async def startup_event():
# 애플리케이션 시작 시 Redis 연결
app.state.cache = cache
@app.on_event("shutdown")
async def shutdown_event():
# 애플리케이션 종료 시 Redis 연결 닫기
await app.state.cache.redis.close()
캐시 데코레이터 구현
# decorators.py
from functools import wraps
from fastapi import Request
import hashlib
import pickle
def cache_response(expire: int = 3600):
"""
API 응답을 캐시하는 데코레이터
사용 예시:
@app.get("/items/{item_id}")
@cache_response(expire=300) # 5분간 캐시
async def read_item(item_id: int):
return {"item_id": item_id}
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 요청에서 FastAPI 앱 가져오기
request = next((arg for arg in args if isinstance(arg, Request)), None)
if not request:
return await func(*args, **kwargs)
# 캐시 키 생성
cache_key = _generate_cache_key(func, args, kwargs)
# 캐시된 데이터 확인
cached_data = await request.app.state.cache.get(cache_key)
if cached_data:
return cached_data
# 새로운 데이터 생성 및 캐시
response = await func(*args, **kwargs)
await request.app.state.cache.set(cache_key, response, expire)
return response
return wrapper
return decorator
def _generate_cache_key(func, args, kwargs) -> str:
"""
고유한 캐시 키 생성
"""
# 함수 이름과 인자들을 결합하여 해시 생성
key_parts = [func.__name__]
key_parts.extend([str(arg) for arg in args if not isinstance(arg, Request)])
key_parts.extend([f"{k}:{v}" for k, v in sorted(kwargs.items())])
key_string = "|".join(key_parts)
return hashlib.md5(key_string.encode()).hexdigest()
API 응답 최적화
응답 압축
# main.py
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
JSON 직렬화 최적화
# serializers.py
from fastapi.encoders import jsonable_encoder
from pydantic import BaseModel
import orjson
class CustomJSONResponse(Response):
"""
최적화된 JSON 직렬화
"""
media_type = "application/json"
def render(self, content: Any) -> bytes:
return orjson.dumps(
content,
option=orjson.OPT_SERIALIZE_NUMPY |
orjson.OPT_SERIALIZE_DATETIME
)
# 사용 예시
@app.get("/items/", response_class=CustomJSONResponse)
async def read_items():
return {"items": items}
비동기 처리 최적화
비동기 작업 처리
# tasks.py
import asyncio
from typing import List
import httpx
async def fetch_external_data(urls: List[str]):
"""
여러 외부 API를 동시에 호출
"""
async with httpx.AsyncClient() as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]
@app.get("/aggregate/")
async def aggregate_data():
"""
여러 소스의 데이터를 병렬로 조회
"""
urls = [
"http://api1.example.com/data",
"http://api2.example.com/data",
"http://api3.example.com/data"
]
data = await fetch_external_data(urls)
return {"results": data}
메모리 캐싱
인메모리 캐시 구현
# memory_cache.py
from typing import Dict, Any
import time
from threading import Lock
class MemoryCache:
def __init__(self):
self._cache: Dict[str, Any] = {}
self._expire_times: Dict[str, float] = {}
self._lock = Lock()
def get(self, key: str) -> Any:
"""
캐시된 값 조회
"""
with self._lock:
if key not in self._cache:
return None
# 만료 시간 확인
if time.time() > self._expire_times[key]:
del self._cache[key]
del self._expire_times[key]
return None
return self._cache[key]
def set(self, key: str, value: Any, expire: int = 300):
"""
값을 캐시에 저장
"""
with self._lock:
self._cache[key] = value
self._expire_times[key] = time.time() + expire
def clear(self):
"""
캐시 초기화
"""
with self._lock:
self._cache.clear()
self._expire_times.clear()
# 사용 예시
memory_cache = MemoryCache()
@app.get("/cached-data/{item_id}")
async def get_cached_data(item_id: int):
cache_key = f"item_{item_id}"
# 캐시 확인
cached_data = memory_cache.get(cache_key)
if cached_data:
return cached_data
# 새로운 데이터 생성
data = await expensive_operation(item_id)
memory_cache.set(cache_key, data, expire=300)
return data
성능 모니터링
성능 메트릭 수집
# monitoring.py
from fastapi import FastAPI, Request
from prometheus_client import Counter, Histogram
import time
# 메트릭 정의
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
REQUEST_LATENCY = Histogram(
'http_request_duration_seconds',
'HTTP request latency',
['method', 'endpoint']
)
app = FastAPI()
@app.middleware("http")
async def monitor_requests(request: Request, call_next):
"""
요청 처리 시간 측정
"""
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
마치며
FastAPI 애플리케이션의 성능 최적화는 데이터베이스 쿼리 최적화, 효율적인 캐싱 전략, 비동기 처리 등 여러 측면에서 접근해야 합니다. 각 프로젝트의 요구사항과 부하 특성에 맞게 적절한 전략을 선택하고 적용하는 것이 중요합니다.