파이썬/Fast API
FastAPI : 병렬 처리로 API 성능 최적화
코샵
2024. 12. 4. 10:04
반응형
소개
FastAPI 서버에서 여러 API 요청을 동시에 처리할 때, 요청 간 간섭이 발생하지 않도록 하는 것이 중요합니다. 이번 글에서는 FastAPI에서 병렬 처리를 구현하는 다양한 방법과 각각의 장단점을 알아보겠습니다.
asyncio를 활용한 기본 병렬 처리
asyncio는 파이썬의 기본 비동기 프로그래밍 라이브러리입니다. FastAPI는 내부적으로 해당 라이브러리를 사용하여 비동기 처리를 구현합니다.
from fastapi import FastAPI
import asyncio
from typing import List
app = FastAPI()
async def process_data(data: dict) -> dict:
"""시간이 걸리는 작업을 시뮬레이션하는 함수"""
await asyncio.sleep(1) # 비동기 처리가 필요한 작업 시뮬레이션
return {"result": data["value"] * 2}
@app.post("/process-multiple")
async def process_multiple_items(items: List[dict]):
# 여러 작업을 동시에 처리
tasks = [process_data(item) for item in items]
results = await asyncio.gather(*tasks)
return results
# 실제 사용 예시:
# POST /process-multiple
# Body: [{"value": 1}, {"value": 2}, {"value": 3}]
장점 | 단점 |
별도의 라이브러리 설치가 필요 없음 | CPU 집약적인 작업에는 적합하지 않음 |
I/O 바운드 작업에 매우 효율적 | 하나의 작업이 블로킹되면 전체 성능에 영향을 줄 수 있음 |
메모리 사용량이 적음 | 디버깅이 상대적으로 어려움 |
구현이 상대적으로 단순 |
백그라운드 작업 처리
BackgroundTasks를 사용하면 HTTP 응답을 먼저 보내고 작업을 백그라운드에서 처리할 수 있습니다.
from fastapi import BackgroundTasks
from typing import Optional
class DataProcessor:
def __init__(self):
self.results = {}
self._lock = asyncio.Lock()
async def process_in_background(self, task_id: str, data: dict):
"""백그라운드에서 데이터 처리"""
await asyncio.sleep(2) # 긴 작업 시뮬레이션
async with self._lock:
self.results[task_id] = {"result": data["value"] * 2}
processor = DataProcessor()
@app.post("/submit-task")
async def submit_task(
data: dict,
background_tasks: BackgroundTasks
):
task_id = str(uuid.uuid4())
background_tasks.add_task(
processor.process_in_background,
task_id,
data
)
return {"task_id": task_id}
@app.get("/get-result/{task_id}")
async def get_result(task_id: str):
result = processor.results.get(task_id)
if result is None:
return {"status": "processing"}
return result
장점 | 단점 |
긴 작업을 비동기적으로 처리 가능 | 작업 상태 추적이 복잡할 수 있음 |
사용자에게 빠른 응답 제공 | 서버 재시작 시 진행 중인 작업 손실 |
FastAPI에 내장된 기능으로 추가 설정 불필요 | 분산 처리에는 적합하지 않음 |
메모리 효율적 |
Celery를 활용한 분산 작업 처리
Celery는 분산 작업 큐 시스템으로, 무거운 작업을 별도의 워커 프로세스에 처리할 수 있게 해 줍니다.
from celery import Celery
from fastapi import FastAPI
from redis import AsyncRedis
# Celery 설정
celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
# Redis 연결
redis = AsyncRedis(host='localhost', port=6379, db=2)
@celery_app.task
def process_heavy_task(data: dict):
# 무거운 작업 처리
time.sleep(5)
return {"result": data["value"] * 2}
@app.post("/submit-heavy-task")
async def submit_heavy_task(data: dict):
# Celery 작업 제출
task = process_heavy_task.delay(data)
# Redis에 작업 상태 저장
await redis.set(f"task:{task.id}", "processing")
return {"task_id": task.id}
@app.get("/check-task/{task_id}")
async def check_task(task_id: str):
# Celery 작업 상태 확인
task = celery_app.AsyncResult(task_id)
if task.ready():
result = task.get()
return {"status": "completed", "result": result}
else:
return {"status": "processing"}
장점 | 단점 |
대규모 분산 처리 가능 | 추가적인 인프라 설정 필요 (Redis/RabbitMQ) |
작업 재시도 및 실패 처리 기능 내장 | 설정과 관리가 복잡 |
작업 모니터링 및 스케줄링 기능 | 시스템 리소스 사용량이 상대적으로 높음 |
워커 스케일링이 용이 |
세마포어를 사용한 동시성 제어
세마포어는 동시에 실행될 수 있는 작업의 수를 제한하여 시스템 리소스를 보호합니다.
from fastapi import FastAPI
import asyncio
from asyncio import Semaphore
app = FastAPI()
# 동시 처리 제한을 위한 세마포어
MAX_CONCURRENT_TASKS = 5
semaphore = Semaphore(MAX_CONCURRENT_TASKS)
async def resource_intensive_task(item: dict):
"""리소스를 많이 사용하는 작업"""
async with semaphore:
await asyncio.sleep(2) # 무거운 작업 시뮬레이션
return {"processed": item["value"]}
@app.post("/process-with-limit")
async def process_with_limit(items: List[dict]):
tasks = [resource_intensive_task(item) for item in items]
results = await asyncio.gather(*tasks)
return results
작업 풀을 사용한 병렬 처리
CPU 집약적인 작업을 별도의 프로세스 풀에서 처리하여 메인 이벤트 루프의 블로킹을 방지합니다.
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
from functools import partial
def cpu_bound_task(data: dict):
"""CPU 집약적인 작업"""
# 실제 CPU 집약적인 작업
return {"result": data["value"] ** 2}
class TaskProcessor:
def __init__(self):
self.executor = ProcessPoolExecutor(
max_workers=multiprocessing.cpu_count()
)
async def process_in_pool(self, data: List[dict]):
"""프로세스 풀에서 작업 실행"""
loop = asyncio.get_event_loop()
tasks = []
for item in data:
task = loop.run_in_executor(
self.executor,
cpu_bound_task,
item
)
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
processor = TaskProcessor()
@app.post("/process-cpu-bound")
async def process_cpu_bound(items: List[dict]):
results = await processor.process_in_pool(items)
return results
장점 | 단점 |
CPU 집약적인 작업에 최적화 | 프로세스 간 데이터 전송 오버헤드 |
메인 이벤트 루프 블로킹 방지 | 메모리 사용량이 증가 |
시스템의 모든 CPU코어 활용 가능 | 프로세스 생성/제거에 따른 오버헤드 |
I/O작업과 CPU작업의 효율적인 분리 | 구현이 상대적으로 복잡 |
상태 관리와 동기화
from fastapi import FastAPI, HTTPException
import asyncio
from typing import Dict
from dataclasses import dataclass
from datetime import datetime
@dataclass
class TaskState:
status: str
started_at: datetime
result: Optional[dict] = None
error: Optional[str] = None
class TaskManager:
def __init__(self):
self.tasks: Dict[str, TaskState] = {}
self._lock = asyncio.Lock()
async def create_task(self, task_id: str) -> None:
async with self._lock:
self.tasks[task_id] = TaskState(
status="processing",
started_at=datetime.now()
)
async def update_task(
self,
task_id: str,
status: str,
result: Optional[dict] = None,
error: Optional[str] = None
) -> None:
async with self._lock:
if task_id not in self.tasks:
raise HTTPException(status_code=404, detail="Task not found")
task = self.tasks[task_id]
task.status = status
if result is not None:
task.result = result
if error is not None:
task.error = error
async def get_task_state(self, task_id: str) -> TaskState:
async with self._lock:
if task_id not in self.tasks:
raise HTTPException(status_code=404, detail="Task not found")
return self.tasks[task_id]
task_manager = TaskManager()
@app.post("/start-task")
async def start_task(data: dict):
task_id = str(uuid.uuid4())
await task_manager.create_task(task_id)
# 백그라운드 작업 시작
background_tasks.add_task(
process_task,
task_id,
data
)
return {"task_id": task_id}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
return await task_manager.get_task_state(task_id)
에러 처리와 복구
from fastapi import HTTPException
from typing import Optional
import logging
logger = logging.getLogger(__name__)
async def safe_process_task(task_id: str, data: dict):
"""안전한 작업 처리"""
try:
result = await process_task(data)
await task_manager.update_task(
task_id,
status="completed",
result=result
)
except Exception as e:
logger.error(f"Task {task_id} failed: {str(e)}")
await task_manager.update_task(
task_id,
status="failed",
error=str(e)
)
# 재시도 로직
if is_retryable_error(e):
await retry_task(task_id, data)
async def retry_task(task_id: str, data: dict, max_retries: int = 3):
"""작업 재시도"""
for i in range(max_retries):
try:
await asyncio.sleep(2 ** i) # 지수 백오프
result = await process_task(data)
await task_manager.update_task(
task_id,
status="completed",
result=result
)
return
except Exception as e:
logger.error(f"Retry {i+1} failed for task {task_id}: {str(e)}")
# 모든 재시도 실패
await task_manager.update_task(
task_id,
status="failed",
error="Max retries exceeded"
)
마치며
FastAPI에서 병렬 처리를 구현할 때는 작업의 특성에 따라 적절한 방법을 선택하는 것이 중요합니다. asyncio를 기본으로 하되, CPU 집약적인 작업은 프로세스 풀을, 장시간 실행되는 작업은 Celery를 사용하는 등 상황에 맞는 전략을 선택해야 합니다.