-
#5 LangGraph 심화: 성능 최적화 기법 [2]LangChain & LangGraph 2025. 3. 18. 20:00
이전 포스트에서는 LangGraph 애플리케이션의 성능을 최적화하기 위한 캐싱 전략과 리소스 관리 기법에 대해 알아보았습니다. 이번 포스트에서는 배치 처리, 결과 스트리밍, 비용 관리, 그리고 성능 모니터링 및 프로파일링과 같은 추가 최적화 기법에 대해 살펴보겠습니다.
배치 처리 최적화
여러 항목을 함께 처리하여 오버헤드를 줄이는 배치 처리 패턴은 LangGraph 애플리케이션의 처리량을 크게 향상시킬 수 있습니다.
1. 기본 배치 처리
여러 항목을 배치로 함께 처리하는 기본적인 방법입니다:
class BatchProcessingState(TypedDict): items: List batch_size: int results: List def batch_processing_node(state: BatchProcessingState) -> Dict: """항목을 배치로 처리""" items = state["items"] batch_size = state.get("batch_size", 32) results = [] # 배치 단위로 처리 for i in range(0, len(items), batch_size): batch = items[i:i+batch_size] # 배치 처리 (예: 한 번의 API 호출로 여러 항목 처리) batch_results = process_batch(batch) results.extend(batch_results) return {"results": results}
이 패턴의 장점:
- API 호출 횟수 감소
- 전체 처리량 향상
- 네트워크 지연 시간 최소화
2. 동적 배치 크기 조정
시스템 부하와 항목 특성에 따라 배치 크기를 동적으로 조정하는 패턴입니다:
def adaptive_batch_processing(state: Dict) -> Dict: """시스템 부하에 따라 배치 크기 동적 조정""" items = state["items"] results = [] # 항목 복잡성 추정 complexity = estimate_complexity(items) # 시스템 부하 확인 system_load = get_system_load() # 동적 배치 크기 계산 if system_load > 0.8 or complexity > 0.7: batch_size = 10 # 작은 배치 elif system_load > 0.5 or complexity > 0.4: batch_size = 20 # 중간 배치 else: batch_size = 50 # 큰 배치 # 배치 처리 for i in range(0, len(items), batch_size): batch = items[i:i+batch_size] batch_results = process_batch(batch) results.extend(batch_results) return {"results": results}
이 방식의 이점:
- 시스템 리소스에 따른 최적의 처리량
- 복잡한 항목을 소량으로 처리하여 타임아웃 방지
- 시스템 과부하 방지
3. 우선순위 기반 배치 처리
항목의 우선순위에 따라 배치를 구성하고 처리하는 패턴입니다:
def priority_based_batching(state: Dict) -> Dict: """우선순위에 따른 배치 처리""" items = state["items"] results = {} # 우선순위별로 항목 분류 high_priority = [] medium_priority = [] low_priority = [] for item in items: if item["priority"] == "high": high_priority.append(item) elif item["priority"] == "medium": medium_priority.append(item) else: low_priority.append(item) # 우선순위별 배치 크기 설정 high_batch_size = 10 # 작은 배치로 빠르게 처리 medium_batch_size = 20 low_batch_size = 50 # 큰 배치로 효율적으로 처리 # 우선순위별 배치 처리 high_results = process_batches(high_priority, high_batch_size) medium_results = process_batches(medium_priority, medium_batch_size) low_results = process_batches(low_priority, low_batch_size) # 결과 병합 results["high"] = high_results results["medium"] = medium_results results["low"] = low_results return {"results": results} def process_batches(items, batch_size): """배치로 항목 처리""" results = [] for i in range(0, len(items), batch_size): batch = items[i:i+batch_size] batch_results = process_batch(batch) results.extend(batch_results) return results
이 패턴은 중요한 항목을 먼저 처리하면서도 전체 처리량을 최적화합니다.
결과 스트리밍
부분 결과를 즉시 제공하는 스트리밍 패턴은 사용자 경험을 크게 향상시킬 수 있습니다.
1. 기본 결과 스트리밍
부분 결과를 순차적으로 반환하는 기본 스트리밍 패턴입니다:
from typing import AsyncGenerator class StreamingState(TypedDict): query: str partial_results: List complete: bool async def streaming_node(state: StreamingState) -> AsyncGenerator: """결과를 스트리밍하는 노드""" query = state["query"] # 스트리밍 처리 시작 async for partial_result in streaming_process(query): # 부분 결과를 즉시 반환 yield { "partial_results": state.get("partial_results", []) + [partial_result], "complete": False } # 모든 결과 처리 완료 yield {"complete": True} # 스트리밍 노드 설정 streaming_graph.add_node("stream_processor", streaming_node)
2. 토큰 단위 LLM 응답 스트리밍
LLM 응답을 토큰 단위로 스트리밍하여 사용자 경험을 개선합니다:
async def stream_llm_response(state: Dict) -> AsyncGenerator: """LLM 응답을 토큰 단위로 스트리밍""" prompt = state["prompt"] # 스트리밍 LLM 호출 설정 response_stream = call_llm_streaming(prompt) accumulated_text = "" # 토큰별 처리 async for token in response_stream: accumulated_text += token # 중간 결과 반환 yield { "partial_response": accumulated_text, "complete": False } # 완료 상태 반환 yield { "partial_response": accumulated_text, "complete": True, "final_response": accumulated_text }
3. 병렬 처리 결과 스트리밍
병렬 처리된 결과를 스트리밍하는 패턴입니다:
import asyncio async def parallel_streaming_processor(state: Dict) -> AsyncGenerator: """병렬 처리 결과 스트리밍""" items = state["items"] batch_size = state.get("batch_size", 10) # 결과 누적 all_results = [] # 배치 단위로 병렬 처리 및 스트리밍 for i in range(0, len(items), batch_size): batch = items[i:i+batch_size] # 병렬 처리 작업 생성 tasks = [process_item(item) for item in batch] # 병렬 실행 및 결과 스트리밍 for completed_task in asyncio.as_completed(tasks): result = await completed_task all_results.append(result) # 부분 결과 즉시 반환 yield { "processed_count": len(all_results), "total_count": len(items), "latest_results": all_results[-batch_size:], "complete": False } # 모든 처리 완료 yield { "processed_count": len(all_results), "total_count": len(items), "all_results": all_results, "complete": True }
이 패턴은 사용자가 전체 처리 진행 상황을 실시간으로 확인할 수 있게 합니다.
자원 사용량 및 비용 관리
리소스 사용량과 API 비용을 효율적으로 관리하는 패턴입니다.
1. API 속도 제한 관리
API 호출 속도를 제한하여 할당량 초과를 방지하는 패턴입니다:
import time import asyncio from aiolimiter import AsyncLimiter # API 호출 속도 제한 (초당 5회) rate_limiter = AsyncLimiter(5, 1) class RateLimitedState(TypedDict): queries: List[str] results: List[Dict] async def rate_limited_node(state: RateLimitedState) -> Dict: """속도 제한을 적용한 API 호출""" queries = state["queries"] results = [] for query in queries: # 속도 제한 적용 async with rate_limiter: # API 호출 result = await api_call(query) results.append(result) return {"results": results}
2. 비용 예산 관리
API 비용을 제한하는 예산 관리 패턴입니다:
class BudgetManager: def __init__(self, daily_budget=10.0): self.daily_budget = daily_budget self.spent_today = 0.0 self.last_reset = datetime.now().date() def reset_if_new_day(self): """날짜가 변경되면 예산 초기화""" today = datetime.now().date() if today > self.last_reset: self.spent_today = 0.0 self.last_reset = today def check_budget(self, estimated_cost): """예산 확인""" self.reset_if_new_day() return self.spent_today + estimated_cost <= self.daily_budget def record_spend(self, cost): """지출 기록""" self.reset_if_new_day() self.spent_today += cost # API 예산 관리자 budget_manager = BudgetManager(daily_budget=50.0) # 일일 $50 예산 def budget_aware_llm_node(state: Dict) -> Dict: """예산을 고려한 LLM 호출""" prompt = state["prompt"] model = state.get("model", "gpt-4o") # 비용 추정 estimated_cost = estimate_llm_cost(prompt, model) # 예산 확인 if not budget_manager.check_budget(estimated_cost): # 예산 초과 시 대체 전략 if state.get("allow_fallback", True): # 저비용 모델로 대체 model = "gpt-3.5-turbo" estimated_cost = estimate_llm_cost(prompt, model) else: # 실행 거부 return {"error": "Budget exceeded", "result": None} # LLM 호출 result = call_llm(prompt, model) # 실제 비용 기록 actual_cost = calculate_actual_cost(result, model) budget_manager.record_spend(actual_cost) return {"result": result, "cost": actual_cost}
3. 모델 스왑 및 캐스케이드
작업 복잡성에 따라 다른 모델을 사용하는 패턴입니다:
from enum import Enum class ModelType(Enum): LIGHTWEIGHT = "lightweight" MEDIUM = "medium" HEAVY = "heavy" class ModelCascadeState(TypedDict): query: str complexity: float selected_model: str result: Dict def evaluate_complexity(state: ModelCascadeState) -> Dict: """쿼리 복잡성 평가""" query = state["query"] # 복잡성 점수 계산 (0.0 ~ 1.0) complexity = calculate_complexity(query) return {"complexity": complexity} def select_model(state: ModelCascadeState) -> Dict: """복잡성에 따른 모델 선택""" complexity = state["complexity"] if complexity < 0.3: model = ModelType.LIGHTWEIGHT.value elif complexity < 0.7: model = ModelType.MEDIUM.value else: model = ModelType.HEAVY.value return {"selected_model": model} def process_with_model(state: ModelCascadeState) -> Dict: """선택된 모델로 처리""" model_type = state["selected_model"] query = state["query"] if model_type == ModelType.LIGHTWEIGHT.value: result = process_with_lightweight_model(query) elif model_type == ModelType.MEDIUM.value: result = process_with_medium_model(query) else: result = process_with_heavy_model(query) return {"result": result} # 그래프 구성 cascade_graph = StateGraph(ModelCascadeState) cascade_graph.add_node("evaluate", evaluate_complexity) cascade_graph.add_node("select", select_model) cascade_graph.add_node("process", process_with_model) # 엣지 설정 cascade_graph.add_edge("evaluate", "select") cascade_graph.add_edge("select", "process")
성능 모니터링 및 프로파일링
LangGraph 애플리케이션의 성능을 모니터링하고 프로파일링하여 병목 현상을 식별하고 최적화할 수 있습니다.
1. 노드 실행 시간 측정
각 노드의 실행 시간을 측정하여 병목 현상을 식별하는 패턴입니다:
import time from functools import wraps def time_profile(func): """노드 실행 시간 측정 데코레이터""" @wraps(func) def wrapper(state): start_time = time.time() result = func(state) end_time = time.time() # 실행 시간 기록 execution_time = end_time - start_time print(f"Node {func.__name__} executed in {execution_time:.4f} seconds") # 결과에 실행 시간 추가 (선택 사항) if isinstance(result, dict): timing_key = f"{func.__name__}_execution_time" result[timing_key] = execution_time return result return wrapper # 프로파일링 적용 @time_profile def expensive_node(state: Dict) -> Dict: # ... 노드 로직 ... return result
2. 메모리 사용량 모니터링
메모리 사용량을 모니터링하여 누수를 방지하는 패턴입니다:
import tracemalloc import gc def monitor_memory_usage(func): """메모리 사용량 모니터링 데코레이터""" @wraps(func) def wrapper(state): # 메모리 추적 시작 tracemalloc.start() # 가비지 컬렉션 실행 gc.collect() # 시작 메모리 스냅샷 start_snapshot = tracemalloc.take_snapshot() # 함수 실행 result = func(state) # 종료 메모리 스냅샷 gc.collect() end_snapshot = tracemalloc.take_snapshot() # 메모리 사용량 변화 계산 memory_diff = end_snapshot.compare_to(start_snapshot, 'lineno') # 상위 5개 메모리 사용 위치 출력 print(f"Memory usage for {func.__name__}:") for stat in memory_diff[:5]: print(f"{stat}") # 메모리 추적 중지 tracemalloc.stop() return result return wrapper # 메모리 모니터링 적용 @monitor_memory_usage def memory_intensive_node(state: Dict) -> Dict: # ... 메모리 집약적 작업 ... return result
3. 성능 대시보드 통합
모니터링 도구와 통합하여 실시간 성능 대시보드를 구축하는 패턴입니다:
import prometheus_client from prometheus_client import Counter, Gauge, Histogram # 메트릭 정의 NODE_EXECUTIONS = Counter('node_executions_total', 'Total node executions', ['node_name']) NODE_EXECUTION_TIME = Histogram('node_execution_time_seconds', 'Node execution time', ['node_name']) MEMORY_USAGE = Gauge('memory_usage_bytes', 'Memory usage in bytes') ERROR_COUNT = Counter('error_count_total', 'Total errors', ['node_name', 'error_type']) def prometheus_instrumented(func): """Prometheus 측정 데코레이터""" @wraps(func) def wrapper(state): node_name = func.__name__ # 실행 횟수 증가 NODE_EXECUTIONS.labels(node_name=node_name).inc() # 현재 메모리 사용량 기록 MEMORY_USAGE.set(get_process_memory_usage()) # 실행 시간 측정 start_time = time.time() try: # 함수 실행 result = func(state) # 실행 시간 기록 execution_time = time.time() - start_time NODE_EXECUTION_TIME.labels(node_name=node_name).observe(execution_time) return result except Exception as e: # 오류 카운터 증가 ERROR_COUNT.labels(node_name=node_name, error_type=type(e).__name__).inc() raise return wrapper # HTTP 서버 시작 (메트릭 노출) def start_metrics_server(port=8000): """Prometheus 메트릭 서버 시작""" from prometheus_client import start_http_server start_http_server(port) print(f"Prometheus metrics available at http://localhost:{port}/metrics") # 애플리케이션 시작 시 메트릭 서버 시작 start_metrics_server() # 계측 적용 @prometheus_instrumented def monitored_node(state: Dict) -> Dict: # ... 노드 로직 ... return result
이 패턴을 사용하면 Prometheus와 Grafana 같은 도구로 실시간 성능 모니터링 대시보드를 구축할 수 있습니다.
결론
이번 포스트에서는 LangGraph 애플리케이션의 성능을 최적화하기 위한 다양한 기법을 살펴보았습니다
- 배치 처리 최적화, 결과 스트리밍, 자원 사용량 및 비용 관리, 성능 모니터링 및 프로파일링
이 최적화 기법들을 적절히 조합하여 적용하면 LangGraph 애플리케이션의 성능, 사용자 경험, 비용 효율성을 크게 향상시킬 수 있습니다. 항상 최적화 전후의 성능을 측정하고, 가장 효과적인 기법에 집중하는 것이 중요합니다.
'LangChain & LangGraph' 카테고리의 다른 글
#5 LangGraph 심화: 성능 최적화 기법 [1] (2) 2025.03.18 #4 LangGraph 심화: 스레드 및 병렬 처리 (0) 2025.03.18 #3 LangGraph 심화: 분기 및 조건부 실행과 오류 처리 전략 (0) 2025.03.18 #1 Lang Graph란 무엇인가요? (0) 2025.03.18 #2 LangGraph 심화: 상태 관리와 대화형 에이전트 구축 (0) 2025.03.18