ABOUT ME

ML / DL Statistics & Math

Today
Yesterday
Total
  • #5 LangGraph 심화: 성능 최적화 기법 [2]
    LangChain & LangGraph 2025. 3. 18. 20:00

    이전 포스트에서는 LangGraph 애플리케이션의 성능을 최적화하기 위한 캐싱 전략과 리소스 관리 기법에 대해 알아보았습니다. 이번 포스트에서는 배치 처리, 결과 스트리밍, 비용 관리, 그리고 성능 모니터링 및 프로파일링과 같은 추가 최적화 기법에 대해 살펴보겠습니다.

    배치 처리 최적화

    여러 항목을 함께 처리하여 오버헤드를 줄이는 배치 처리 패턴은 LangGraph 애플리케이션의 처리량을 크게 향상시킬 수 있습니다.

    1. 기본 배치 처리

    여러 항목을 배치로 함께 처리하는 기본적인 방법입니다:

    class BatchProcessingState(TypedDict):
        items: List
        batch_size: int
        results: List
    
    def batch_processing_node(state: BatchProcessingState) -> Dict:
        """항목을 배치로 처리"""
        items = state["items"]
        batch_size = state.get("batch_size", 32)
        results = []
        
        # 배치 단위로 처리
        for i in range(0, len(items), batch_size):
            batch = items[i:i+batch_size]
            # 배치 처리 (예: 한 번의 API 호출로 여러 항목 처리)
            batch_results = process_batch(batch)
            results.extend(batch_results)
        
        return {"results": results}
    

    이 패턴의 장점:

    • API 호출 횟수 감소
    • 전체 처리량 향상
    • 네트워크 지연 시간 최소화

    2. 동적 배치 크기 조정

    시스템 부하와 항목 특성에 따라 배치 크기를 동적으로 조정하는 패턴입니다:

    def adaptive_batch_processing(state: Dict) -> Dict:
        """시스템 부하에 따라 배치 크기 동적 조정"""
        items = state["items"]
        results = []
        
        # 항목 복잡성 추정
        complexity = estimate_complexity(items)
        
        # 시스템 부하 확인
        system_load = get_system_load()
        
        # 동적 배치 크기 계산
        if system_load > 0.8 or complexity > 0.7:
            batch_size = 10  # 작은 배치
        elif system_load > 0.5 or complexity > 0.4:
            batch_size = 20  # 중간 배치
        else:
            batch_size = 50  # 큰 배치
        
        # 배치 처리
        for i in range(0, len(items), batch_size):
            batch = items[i:i+batch_size]
            batch_results = process_batch(batch)
            results.extend(batch_results)
        
        return {"results": results}
    

    이 방식의 이점:

    • 시스템 리소스에 따른 최적의 처리량
    • 복잡한 항목을 소량으로 처리하여 타임아웃 방지
    • 시스템 과부하 방지

    3. 우선순위 기반 배치 처리

    항목의 우선순위에 따라 배치를 구성하고 처리하는 패턴입니다:

    def priority_based_batching(state: Dict) -> Dict:
        """우선순위에 따른 배치 처리"""
        items = state["items"]
        results = {}
        
        # 우선순위별로 항목 분류
        high_priority = []
        medium_priority = []
        low_priority = []
        
        for item in items:
            if item["priority"] == "high":
                high_priority.append(item)
            elif item["priority"] == "medium":
                medium_priority.append(item)
            else:
                low_priority.append(item)
        
        # 우선순위별 배치 크기 설정
        high_batch_size = 10  # 작은 배치로 빠르게 처리
        medium_batch_size = 20
        low_batch_size = 50  # 큰 배치로 효율적으로 처리
        
        # 우선순위별 배치 처리
        high_results = process_batches(high_priority, high_batch_size)
        medium_results = process_batches(medium_priority, medium_batch_size)
        low_results = process_batches(low_priority, low_batch_size)
        
        # 결과 병합
        results["high"] = high_results
        results["medium"] = medium_results
        results["low"] = low_results
        
        return {"results": results}
    
    def process_batches(items, batch_size):
        """배치로 항목 처리"""
        results = []
        
        for i in range(0, len(items), batch_size):
            batch = items[i:i+batch_size]
            batch_results = process_batch(batch)
            results.extend(batch_results)
        
        return results
    

    이 패턴은 중요한 항목을 먼저 처리하면서도 전체 처리량을 최적화합니다.

    결과 스트리밍

    부분 결과를 즉시 제공하는 스트리밍 패턴은 사용자 경험을 크게 향상시킬 수 있습니다.

    1. 기본 결과 스트리밍

    부분 결과를 순차적으로 반환하는 기본 스트리밍 패턴입니다:

    from typing import AsyncGenerator
    
    class StreamingState(TypedDict):
        query: str
        partial_results: List
        complete: bool
    
    async def streaming_node(state: StreamingState) -> AsyncGenerator:
        """결과를 스트리밍하는 노드"""
        query = state["query"]
        
        # 스트리밍 처리 시작
        async for partial_result in streaming_process(query):
            # 부분 결과를 즉시 반환
            yield {
                "partial_results": state.get("partial_results", []) + [partial_result],
                "complete": False
            }
        
        # 모든 결과 처리 완료
        yield {"complete": True}
    
    # 스트리밍 노드 설정
    streaming_graph.add_node("stream_processor", streaming_node)
    

    2. 토큰 단위 LLM 응답 스트리밍

    LLM 응답을 토큰 단위로 스트리밍하여 사용자 경험을 개선합니다:

    async def stream_llm_response(state: Dict) -> AsyncGenerator:
        """LLM 응답을 토큰 단위로 스트리밍"""
        prompt = state["prompt"]
        
        # 스트리밍 LLM 호출 설정
        response_stream = call_llm_streaming(prompt)
        
        accumulated_text = ""
        
        # 토큰별 처리
        async for token in response_stream:
            accumulated_text += token
            
            # 중간 결과 반환
            yield {
                "partial_response": accumulated_text,
                "complete": False
            }
        
        # 완료 상태 반환
        yield {
            "partial_response": accumulated_text,
            "complete": True,
            "final_response": accumulated_text
        }
    

    3. 병렬 처리 결과 스트리밍

    병렬 처리된 결과를 스트리밍하는 패턴입니다:

    import asyncio
    
    async def parallel_streaming_processor(state: Dict) -> AsyncGenerator:
        """병렬 처리 결과 스트리밍"""
        items = state["items"]
        batch_size = state.get("batch_size", 10)
        
        # 결과 누적
        all_results = []
        
        # 배치 단위로 병렬 처리 및 스트리밍
        for i in range(0, len(items), batch_size):
            batch = items[i:i+batch_size]
            
            # 병렬 처리 작업 생성
            tasks = [process_item(item) for item in batch]
            
            # 병렬 실행 및 결과 스트리밍
            for completed_task in asyncio.as_completed(tasks):
                result = await completed_task
                all_results.append(result)
                
                # 부분 결과 즉시 반환
                yield {
                    "processed_count": len(all_results),
                    "total_count": len(items),
                    "latest_results": all_results[-batch_size:],
                    "complete": False
                }
        
        # 모든 처리 완료
        yield {
            "processed_count": len(all_results),
            "total_count": len(items),
            "all_results": all_results,
            "complete": True
        }
    

    이 패턴은 사용자가 전체 처리 진행 상황을 실시간으로 확인할 수 있게 합니다.

    자원 사용량 및 비용 관리

    리소스 사용량과 API 비용을 효율적으로 관리하는 패턴입니다.

    1. API 속도 제한 관리

    API 호출 속도를 제한하여 할당량 초과를 방지하는 패턴입니다:

    import time
    import asyncio
    from aiolimiter import AsyncLimiter
    
    # API 호출 속도 제한 (초당 5회)
    rate_limiter = AsyncLimiter(5, 1)
    
    class RateLimitedState(TypedDict):
        queries: List[str]
        results: List[Dict]
    
    async def rate_limited_node(state: RateLimitedState) -> Dict:
        """속도 제한을 적용한 API 호출"""
        queries = state["queries"]
        results = []
        
        for query in queries:
            # 속도 제한 적용
            async with rate_limiter:
                # API 호출
                result = await api_call(query)
                results.append(result)
        
        return {"results": results}
    

    2. 비용 예산 관리

    API 비용을 제한하는 예산 관리 패턴입니다:

    class BudgetManager:
        def __init__(self, daily_budget=10.0):
            self.daily_budget = daily_budget
            self.spent_today = 0.0
            self.last_reset = datetime.now().date()
        
        def reset_if_new_day(self):
            """날짜가 변경되면 예산 초기화"""
            today = datetime.now().date()
            if today > self.last_reset:
                self.spent_today = 0.0
                self.last_reset = today
        
        def check_budget(self, estimated_cost):
            """예산 확인"""
            self.reset_if_new_day()
            return self.spent_today + estimated_cost <= self.daily_budget
        
        def record_spend(self, cost):
            """지출 기록"""
            self.reset_if_new_day()
            self.spent_today += cost
    
    # API 예산 관리자
    budget_manager = BudgetManager(daily_budget=50.0)  # 일일 $50 예산
    
    def budget_aware_llm_node(state: Dict) -> Dict:
        """예산을 고려한 LLM 호출"""
        prompt = state["prompt"]
        model = state.get("model", "gpt-4o")
        
        # 비용 추정
        estimated_cost = estimate_llm_cost(prompt, model)
        
        # 예산 확인
        if not budget_manager.check_budget(estimated_cost):
            # 예산 초과 시 대체 전략
            if state.get("allow_fallback", True):
                # 저비용 모델로 대체
                model = "gpt-3.5-turbo"
                estimated_cost = estimate_llm_cost(prompt, model)
            else:
                # 실행 거부
                return {"error": "Budget exceeded", "result": None}
        
        # LLM 호출
        result = call_llm(prompt, model)
        
        # 실제 비용 기록
        actual_cost = calculate_actual_cost(result, model)
        budget_manager.record_spend(actual_cost)
        
        return {"result": result, "cost": actual_cost}
    

    3. 모델 스왑 및 캐스케이드

    작업 복잡성에 따라 다른 모델을 사용하는 패턴입니다:

    from enum import Enum
    
    class ModelType(Enum):
        LIGHTWEIGHT = "lightweight"
        MEDIUM = "medium"
        HEAVY = "heavy"
    
    class ModelCascadeState(TypedDict):
        query: str
        complexity: float
        selected_model: str
        result: Dict
    
    def evaluate_complexity(state: ModelCascadeState) -> Dict:
        """쿼리 복잡성 평가"""
        query = state["query"]
        # 복잡성 점수 계산 (0.0 ~ 1.0)
        complexity = calculate_complexity(query)
        return {"complexity": complexity}
    
    def select_model(state: ModelCascadeState) -> Dict:
        """복잡성에 따른 모델 선택"""
        complexity = state["complexity"]
        
        if complexity < 0.3:
            model = ModelType.LIGHTWEIGHT.value
        elif complexity < 0.7:
            model = ModelType.MEDIUM.value
        else:
            model = ModelType.HEAVY.value
        
        return {"selected_model": model}
    
    def process_with_model(state: ModelCascadeState) -> Dict:
        """선택된 모델로 처리"""
        model_type = state["selected_model"]
        query = state["query"]
        
        if model_type == ModelType.LIGHTWEIGHT.value:
            result = process_with_lightweight_model(query)
        elif model_type == ModelType.MEDIUM.value:
            result = process_with_medium_model(query)
        else:
            result = process_with_heavy_model(query)
        
        return {"result": result}
    
    # 그래프 구성
    cascade_graph = StateGraph(ModelCascadeState)
    cascade_graph.add_node("evaluate", evaluate_complexity)
    cascade_graph.add_node("select", select_model)
    cascade_graph.add_node("process", process_with_model)
    
    # 엣지 설정
    cascade_graph.add_edge("evaluate", "select")
    cascade_graph.add_edge("select", "process")
    

    성능 모니터링 및 프로파일링

    LangGraph 애플리케이션의 성능을 모니터링하고 프로파일링하여 병목 현상을 식별하고 최적화할 수 있습니다.

    1. 노드 실행 시간 측정

    각 노드의 실행 시간을 측정하여 병목 현상을 식별하는 패턴입니다:

    import time
    from functools import wraps
    
    def time_profile(func):
        """노드 실행 시간 측정 데코레이터"""
        @wraps(func)
        def wrapper(state):
            start_time = time.time()
            result = func(state)
            end_time = time.time()
            
            # 실행 시간 기록
            execution_time = end_time - start_time
            print(f"Node {func.__name__} executed in {execution_time:.4f} seconds")
            
            # 결과에 실행 시간 추가 (선택 사항)
            if isinstance(result, dict):
                timing_key = f"{func.__name__}_execution_time"
                result[timing_key] = execution_time
            
            return result
        
        return wrapper
    
    # 프로파일링 적용
    @time_profile
    def expensive_node(state: Dict) -> Dict:
        # ... 노드 로직 ...
        return result
    

    2. 메모리 사용량 모니터링

    메모리 사용량을 모니터링하여 누수를 방지하는 패턴입니다:

    import tracemalloc
    import gc
    
    def monitor_memory_usage(func):
        """메모리 사용량 모니터링 데코레이터"""
        @wraps(func)
        def wrapper(state):
            # 메모리 추적 시작
            tracemalloc.start()
            
            # 가비지 컬렉션 실행
            gc.collect()
            
            # 시작 메모리 스냅샷
            start_snapshot = tracemalloc.take_snapshot()
            
            # 함수 실행
            result = func(state)
            
            # 종료 메모리 스냅샷
            gc.collect()
            end_snapshot = tracemalloc.take_snapshot()
            
            # 메모리 사용량 변화 계산
            memory_diff = end_snapshot.compare_to(start_snapshot, 'lineno')
            
            # 상위 5개 메모리 사용 위치 출력
            print(f"Memory usage for {func.__name__}:")
            for stat in memory_diff[:5]:
                print(f"{stat}")
            
            # 메모리 추적 중지
            tracemalloc.stop()
            
            return result
        
        return wrapper
    
    # 메모리 모니터링 적용
    @monitor_memory_usage
    def memory_intensive_node(state: Dict) -> Dict:
        # ... 메모리 집약적 작업 ...
        return result
    

    3. 성능 대시보드 통합

    모니터링 도구와 통합하여 실시간 성능 대시보드를 구축하는 패턴입니다:

    import prometheus_client
    from prometheus_client import Counter, Gauge, Histogram
    
    # 메트릭 정의
    NODE_EXECUTIONS = Counter('node_executions_total', 'Total node executions', ['node_name'])
    NODE_EXECUTION_TIME = Histogram('node_execution_time_seconds', 'Node execution time', ['node_name'])
    MEMORY_USAGE = Gauge('memory_usage_bytes', 'Memory usage in bytes')
    ERROR_COUNT = Counter('error_count_total', 'Total errors', ['node_name', 'error_type'])
    
    def prometheus_instrumented(func):
        """Prometheus 측정 데코레이터"""
        @wraps(func)
        def wrapper(state):
            node_name = func.__name__
            
            # 실행 횟수 증가
            NODE_EXECUTIONS.labels(node_name=node_name).inc()
            
            # 현재 메모리 사용량 기록
            MEMORY_USAGE.set(get_process_memory_usage())
            
            # 실행 시간 측정
            start_time = time.time()
            
            try:
                # 함수 실행
                result = func(state)
                
                # 실행 시간 기록
                execution_time = time.time() - start_time
                NODE_EXECUTION_TIME.labels(node_name=node_name).observe(execution_time)
                
                return result
            except Exception as e:
                # 오류 카운터 증가
                ERROR_COUNT.labels(node_name=node_name, error_type=type(e).__name__).inc()
                raise
        
        return wrapper
    
    # HTTP 서버 시작 (메트릭 노출)
    def start_metrics_server(port=8000):
        """Prometheus 메트릭 서버 시작"""
        from prometheus_client import start_http_server
        start_http_server(port)
        print(f"Prometheus metrics available at http://localhost:{port}/metrics")
    
    # 애플리케이션 시작 시 메트릭 서버 시작
    start_metrics_server()
    
    # 계측 적용
    @prometheus_instrumented
    def monitored_node(state: Dict) -> Dict:
        # ... 노드 로직 ...
        return result
    

    이 패턴을 사용하면 Prometheus와 Grafana 같은 도구로 실시간 성능 모니터링 대시보드를 구축할 수 있습니다.

    결론

    이번 포스트에서는 LangGraph 애플리케이션의 성능을 최적화하기 위한 다양한 기법을 살펴보았습니다

    - 배치 처리 최적화, 결과 스트리밍, 자원 사용량 및 비용 관리, 성능 모니터링 및 프로파일링

    이 최적화 기법들을 적절히 조합하여 적용하면 LangGraph 애플리케이션의 성능, 사용자 경험, 비용 효율성을 크게 향상시킬 수 있습니다. 항상 최적화 전후의 성능을 측정하고, 가장 효과적인 기법에 집중하는 것이 중요합니다.

     


     

Designed by Tistory.