ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • #5 LangGraph 심화: 성능 최적화 기법 [1]
    LangChain & LangGraph 2025. 3. 18. 20:00

     

    이전 포스트에서는 LangGraph의 스레드 및 병렬 처리 패턴에 대해 살펴보았습니다. 이번 포스트에서는 LangGraph 애플리케이션의 성능을 더욱 극대화하기 위한 다양한 최적화 기법에 대해 알아보겠습니다. 이번 글은 두 편으로 나누어 진행하며, 이 첫 번째 파트에서는 캐싱 전략과 리소스 관리에 중점을 두겠습니다.

    성능 최적화의 중요성

    LLM 애플리케이션의 성능은 사용자 경험과 운영 비용에 직접적인 영향을 미칩니다:

    1. 응답 시간 개선: 빠른 응답은 사용자 만족도를 높입니다.
    2. 리소스 효율성: 최적화된 애플리케이션은 동일한 하드웨어로 더 많은 작업을 처리할 수 있습니다.
    3. 비용 절감: API 호출 최적화는 LLM API 비용을 줄입니다.
    4. 확장성 향상: 효율적인 애플리케이션은 더 많은 사용자와 데이터를 처리할 수 있습니다.

    캐싱 전략

    반복적인 계산이나 API 호출을 줄이는 캐싱 전략은 LangGraph 애플리케이션의 성능 향상에 큰 도움이 됩니다.

    1. 기본 디스크 캐싱

    반복되는 비용이 많이 드는 계산 결과를 캐싱하는 방법입니다:

    import functools
    from diskcache import Cache
    
    # 디스크 캐시 초기화
    cache = Cache('./cache')
    
    def cached_node(func):
        """결과를 캐싱하는 데코레이터"""
        @functools.wraps(func)
        def wrapper(state):
            # 캐시 키 생성 (관련 상태 값만 포함)
            cache_key = f"{func.__name__}:{hash(frozenset(state.items()))}"
            
            # 캐시에서 결과 조회
            cached_result = cache.get(cache_key)
            if cached_result is not None:
                return cached_result
            
            # 결과 계산 및 캐싱
            result = func(state)
            cache.set(cache_key, result, expire=3600)  # 1시간 만료
            return result
        
        return wrapper
    
    # 캐싱된 노드 적용
    @cached_node
    def expensive_computation(state: Dict) -> Dict:
        """비용이 많이 드는 계산 수행"""
        # ... 복잡한 계산 또는 API 호출 ...
        return result
    

    이 패턴의 주요 장점:

    • 동일 입력에 대한 반복 계산 방지
    • 시간이 많이 걸리는 API 호출 감소
    • 디스크에 저장되므로 애플리케이션 재시작 후에도 캐시 유지

    2. 계층적 캐싱 전략

    여러 수준의 캐시를 사용하여 성능을 최적화할 수 있습니다:

    import redis
    from functools import lru_cache
    import json
    
    # Redis 연결
    redis_client = redis.Redis(host='localhost', port=6379, db=0)
    
    class MultiLevelCache:
        def __init__(self, memory_size=1000, redis_ttl=3600):
            self.redis_ttl = redis_ttl
            self.memory_cache = {}
            
            # 메모리 캐시 데코레이터
            self.mem_cache = lru_cache(maxsize=memory_size)
            
        def get(self, key):
            # 1. 메모리 캐시 확인
            if key in self.memory_cache:
                return self.memory_cache[key]
            
            # 2. Redis 캐시 확인
            redis_result = redis_client.get(key)
            if redis_result:
                # 메모리 캐시에도 저장
                result = json.loads(redis_result)
                self.memory_cache[key] = result
                return result
            
            return None
        
        def set(self, key, value, ttl=None):
            # Redis에 저장
            redis_client.set(key, json.dumps(value), ex=ttl or self.redis_ttl)
            
            # 메모리 캐시에도 저장
            self.memory_cache[key] = value
    
    # 계층적 캐시 사용 예시
    cache = MultiLevelCache()
    
    def cached_llm_call(prompt, model="gpt-4o"):
        """LLM 호출 결과를 계층적으로 캐싱"""
        cache_key = f"llm:{model}:{hash(prompt)}"
        
        # 캐시 확인
        cached_result = cache.get(cache_key)
        if cached_result:
            return cached_result
        
        # 캐시 미스: 실제 API 호출
        result = call_llm_api(prompt, model)
        
        # 결과 캐싱
        cache.set(cache_key, result)
        
        return result
    

    이 패턴은 다음과 같은 이점을 제공합니다:

    • 매우 빠른 액세스를 위한 메모리 캐시 (L1)
    • 더 큰 용량과 지속성을 위한 Redis 캐시 (L2)
    • 메모리와 디스크 간의 최적의 균형 제공

    3. 캐시 무효화 전략

    데이터가 변경될 때 캐시를 효율적으로 무효화하는 방법입니다:

    class SmartCache:
        def __init__(self, base_ttl=3600):
            self.cache = {}
            self.base_ttl = base_ttl
            self.version_keys = {}  # 키별 버전 추적
        
        def get(self, key, dependencies=None):
            """종속성을 고려한 캐시 조회"""
            if key not in self.cache:
                return None
            
            # 캐시 항목과 메타데이터 가져오기
            cached_item = self.cache[key]
            expiry_time = cached_item["expiry"]
            
            # 만료 확인
            if time.time() > expiry_time:
                del self.cache[key]
                return None
            
            # 종속성 버전 확인
            if dependencies:
                for dep in dependencies:
                    current_version = self.version_keys.get(dep, 0)
                    cached_version = cached_item["dep_versions"].get(dep, -1)
                    
                    # 종속성이 변경됨
                    if current_version > cached_version:
                        del self.cache[key]
                        return None
            
            return cached_item["value"]
        
        def set(self, key, value, ttl=None, dependencies=None):
            """종속성을 추적하며 캐시에 저장"""
            # 종속성 버전 기록
            dep_versions = {}
            if dependencies:
                for dep in dependencies:
                    dep_versions[dep] = self.version_keys.get(dep, 0)
            
            # 캐시 설정
            self.cache[key] = {
                "value": value,
                "expiry": time.time() + (ttl or self.base_ttl),
                "dep_versions": dep_versions
            }
        
        def invalidate_dependency(self, dependency):
            """종속성 기반 무효화"""
            # 종속성 버전 증가
            current_version = self.version_keys.get(dependency, 0)
            self.version_keys[dependency] = current_version + 1
    
    # 스마트 캐시 사용 예시
    smart_cache = SmartCache()
    
    def cache_aware_node(state: Dict) -> Dict:
        """종속성 인식 캐싱 노드"""
        query = state["query"]
        user_id = state["user_id"]
        
        # 사용자 데이터에 종속된 캐시 키
        cache_key = f"result:{hash(query)}"
        dependencies = [f"user:{user_id}", "global_data"]
        
        # 캐시 확인
        cached_result = smart_cache.get(cache_key, dependencies)
        if cached_result:
            return cached_result
        
        # 결과 계산
        result = compute_result(query, user_id)
        
        # 결과 캐싱
        smart_cache.set(cache_key, result, dependencies=dependencies)
        
        return result
    
    # 사용자 데이터 업데이트 시 캐시 무효화
    def update_user_data(user_id, new_data):
        # 데이터 업데이트
        save_user_data(user_id, new_data)
        
        # 관련 캐시 무효화
        smart_cache.invalidate_dependency(f"user:{user_id}")
    

    이 패턴은 다음과 같은 상황에서 유용합니다:

    • 데이터가 자주 변경되는 경우
    • 여러 캐시 항목이 공통 데이터에 의존하는 경우
    • 세분화된 캐시 무효화가 필요한 경우

    지연 로딩 및 리소스 관리

    필요한 순간에 리소스를 로드하고 효율적으로 관리하는 기법입니다.

    1. 모델 지연 로딩

    필요할 때만 무거운 모델을 로드하는 패턴입니다:

    from functools import lru_cache
    
    class ResourceManagementState(TypedDict):
        query: str
        model_type: str
        result: Dict
    
    @lru_cache(maxsize=4)  # 최대 4개 모델 캐싱
    def get_model(model_type: str):
        """모델 지연 로딩"""
        if model_type == "gpt4":
            return load_gpt4_model()
        elif model_type == "claude":
            return load_claude_model()
        elif model_type == "gemini":
            return load_gemini_model()
        else:
            return load_default_model()
    
    def model_processing_node(state: ResourceManagementState) -> Dict:
        """필요한 모델만 로드하여 사용"""
        model_type = state["model_type"]
        query = state["query"]
        
        # 필요한 모델만 로드 (캐싱됨)
        model = get_model(model_type)
        
        # 모델 사용
        result = model.process(query)
        
        return {"result": result}
    

    이 패턴의 장점:

    • 필요한 경우에만 무거운 리소스를 로드
    • lru_cache를 통한 자동 메모리 관리
    • 자주 사용하는 모델은 메모리에 유지

    2. 리소스 풀링

    비용이 많이 드는 리소스를 효율적으로 재사용하는 풀링 패턴입니다:

    import threading
    
    class ResourcePool:
        def __init__(self, max_size=5, create_func=None):
            self.pool = []
            self.max_size = max_size
            self.create_func = create_func
            self.in_use = set()
            self.lock = threading.Lock()
        
        def get_resource(self):
            with self.lock:
                # 사용 가능한 리소스 확인
                available = [r for r in self.pool if r not in self.in_use]
                
                if available:
                    # 사용 가능한 리소스 반환
                    resource = available[0]
                    self.in_use.add(resource)
                    return resource
                
                # 새 리소스 생성 (풀 크기 제한 내에서)
                if len(self.pool) < self.max_size:
                    resource = self.create_func()
                    self.pool.append(resource)
                    self.in_use.add(resource)
                    return resource
                
                # 풀이 가득 찬 경우 대기
                return None
        
        def release_resource(self, resource):
            with self.lock:
                if resource in self.in_use:
                    self.in_use.remove(resource)
    
    # 데이터베이스 연결 풀 예시
    db_pool = ResourcePool(
        max_size=10,
        create_func=lambda: create_db_connection()
    )
    
    def database_query_node(state: Dict) -> Dict:
        """풀링된 데이터베이스 연결 사용"""
        query = state["query"]
        
        # 연결 획득
        connection = db_pool.get_resource()
        
        try:
            # 쿼리 실행
            result = execute_query(connection, query)
            return {"result": result}
        finally:
            # 연결 반환
            db_pool.release_resource(connection)
    

    이 패턴은 다음과 같은 리소스에 적합합니다:

    • 데이터베이스 연결
    • 외부 API 클라이언트
    • 파일 핸들러
    • 네트워크 소켓

    3. 자동 리소스 정리

    사용하지 않는 리소스를 자동으로 정리하는 패턴입니다:

    import time
    import threading
    import weakref
    
    class ResourceManager:
        def __init__(self, idle_timeout=300):  # 5분 타임아웃
            self.resources = {}  # {id: (resource, last_used_time)}
            self.idle_timeout = idle_timeout
            self.lock = threading.Lock()
            
            # 백그라운드 정리 스레드 시작
            self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
            self.cleanup_thread.start()
        
        def get_resource(self, resource_id, create_func):
            """리소스 획득 (없으면 생성)"""
            with self.lock:
                if resource_id in self.resources:
                    resource, _ = self.resources[resource_id]
                    # 마지막 사용 시간 업데이트
                    self.resources[resource_id] = (resource, time.time())
                    return resource
                
                # 새 리소스 생성
                resource = create_func()
                self.resources[resource_id] = (resource, time.time())
                
                # 약한 참조로 리소스 래핑하여 finalize 콜백 등록
                weakref.finalize(resource, self._resource_finalizer, resource_id)
                
                return resource
        
        def _resource_finalizer(self, resource_id):
            """리소스가 가비지 컬렉션될 때 호출됨"""
            with self.lock:
                if resource_id in self.resources:
                    resource, _ = self.resources[resource_id]
                    
                    # 리소스 정리 (예: 연결 닫기)
                    if hasattr(resource, 'close'):
                        try:
                            resource.close()
                        except:
                            pass
                    
                    del self.resources[resource_id]
        
        def _cleanup_loop(self):
            """오래된, 사용하지 않는 리소스 정리"""
            while True:
                time.sleep(60)  # 1분마다 확인
                
                with self.lock:
                    current_time = time.time()
                    to_remove = []
                    
                    # 오래된 리소스 찾기
                    for resource_id, (resource, last_used) in self.resources.items():
                        if current_time - last_used > self.idle_timeout:
                            to_remove.append((resource_id, resource))
                    
                    # 오래된 리소스 정리
                    for resource_id, resource in to_remove:
                        if hasattr(resource, 'close'):
                            try:
                                resource.close()
                            except:
                                pass
                        
                        del self.resources[resource_id]
    
    # 리소스 관리자 사용 예시
    resource_manager = ResourceManager()
    
    def managed_resource_node(state: Dict) -> Dict:
        """자동 정리되는 리소스 사용"""
        resource_id = f"client:{state['client_type']}"
        
        # 리소스 획득 (없으면 생성)
        client = resource_manager.get_resource(
            resource_id,
            lambda: create_client(state['client_type'])
        )
        
        # 클라이언트 사용
        result = client.execute(state["command"])
        
        return {"result": result}
    

    이 패턴의 특징:

    • 사용하지 않는 유휴 리소스 자동 정리
    • 약한 참조를 통한 가비지 컬렉션 지원
    • 백그라운드 정리 프로세스로 메모리 누수 방지

    실제 적용 사례: 지식 베이스 시스템

    이러한 캐싱 및 리소스 관리 패턴을 적용한 실제 LangGraph 애플리케이션의 예를 살펴보겠습니다:

    from langgraph.graph import StateGraph
    from typing import TypedDict, List, Dict
    import time
    
    class KnowledgeBaseState(TypedDict):
        query: str
        documents: List[Dict]
        context: List[str]
        embeddings: Dict
        answer: str
    
    # 리소스 관리자
    class EmbeddingManager:
        def __init__(self):
            self.model = None
            self.last_used = 0
        
        def get_model(self):
            current_time = time.time()
            
            # 모델이 없거나 5분 이상 사용하지 않았으면 (재)로드
            if not self.model or (current_time - self.last_used > 300):
                self.model = load_embedding_model()
            
            self.last_used = current_time
            return self.model
    
    # 글로벌 리소스 관리자
    embedding_manager = EmbeddingManager()
    
    # 캐시 설정
    document_cache = Cache('./document_cache')
    embedding_cache = Cache('./embedding_cache')
    search_cache = Cache('./search_cache')
    
    def retrieve_documents(state: KnowledgeBaseState) -> Dict:
        """문서 검색 (캐싱 적용)"""
        query = state["query"]
        
        # 캐시 확인
        cache_key = f"search:{hash(query)}"
        cached_result = search_cache.get(cache_key)
        if cached_result:
            return {"documents": cached_result}
        
        # 새 검색 수행
        documents = search_documents(query)
        
        # 결과 캐싱
        search_cache.set(cache_key, documents, expire=3600)
        
        return {"documents": documents}
    
    def compute_embeddings(state: KnowledgeBaseState) -> Dict:
        """임베딩 계산 (캐싱 및 리소스 관리 적용)"""
        documents = state["documents"]
        query = state["query"]
        
        # 캐시 초기화
        embeddings = {"query": None, "documents": []}
        
        # 쿼리 임베딩 계산 (캐싱)
        query_cache_key = f"query_embedding:{hash(query)}"
        cached_query_embedding = embedding_cache.get(query_cache_key)
        
        if cached_query_embedding:
            embeddings["query"] = cached_query_embedding
        else:
            # 임베딩 모델 가져오기 (지연 로딩)
            model = embedding_manager.get_model()
            
            # 임베딩 계산
            query_embedding = model.encode(query)
            
            # 결과 캐싱
            embedding_cache.set(query_cache_key, query_embedding)
            embeddings["query"] = query_embedding
        
        # 문서 임베딩 계산 (캐싱)
        for doc in documents:
            doc_id = doc["id"]
            doc_content = doc["content"]
            
            doc_cache_key = f"doc_embedding:{doc_id}"
            cached_doc_embedding = embedding_cache.get(doc_cache_key)
            
            if cached_doc_embedding:
                embeddings["documents"].append(cached_doc_embedding)
            else:
                # 임베딩 모델 가져오기 (이미 로드되어 있을 수 있음)
                model = embedding_manager.get_model()
                
                # 임베딩 계산
                doc_embedding = model.encode(doc_content)
                
                # 결과 캐싱
                embedding_cache.set(doc_cache_key, doc_embedding)
                embeddings["documents"].append(doc_embedding)
        
        return {"embeddings": embeddings}
    
    def select_context(state: KnowledgeBaseState) -> Dict:
        """관련 컨텍스트 선택"""
        documents = state["documents"]
        embeddings = state["embeddings"]
        
        query_embedding = embeddings["query"]
        doc_embeddings = embeddings["documents"]
        
        # 유사도 계산 및 상위 문서 선택
        similarities = compute_similarities(query_embedding, doc_embeddings)
        ranked_docs = [(documents[i], similarities[i]) for i in range(len(documents))]
        ranked_docs.sort(key=lambda x: x[1], reverse=True)
        
        # 상위 3개 문서만 선택
        selected_docs = ranked_docs[:3]
        
        # 컨텍스트로 변환
        context = [doc["content"] for doc, _ in selected_docs]
        
        return {"context": context}
    
    def generate_answer(state: KnowledgeBaseState) -> Dict:
        """답변 생성"""
        query = state["query"]
        context = state["context"]
        
        # 프롬프트 구성
        prompt = f"""
        Answer the question based on the following context:
        
        Context:
        {' '.join(context)}
        
        Question: {query}
        
        Answer:
        """
        
        # LLM 호출 (여기서도 캐싱 적용 가능)
        answer = call_llm(prompt)
        
        return {"answer": answer}
    
    # 그래프 구성
    kb_graph = StateGraph(KnowledgeBaseState)
    kb_graph.add_node("retrieve", retrieve_documents)
    kb_graph.add_node("embed", compute_embeddings)
    kb_graph.add_node("select", select_context)
    kb_graph.add_node("generate", generate_answer)
    
    # 엣지 설정
    kb_graph.add_edge("retrieve", "embed")
    kb_graph.add_edge("embed", "select")
    kb_graph.add_edge("select", "generate")
    

    이 지식 베이스 시스템은 다양한 성능 최적화 기법을 적용했습니다:

    1. 다중 캐싱:
      • 문서 검색 결과 캐싱
      • 임베딩 계산 결과 캐싱
      • (옵션) LLM 호출 결과 캐싱
    2. 리소스 관리:
      • 임베딩 모델 지연 로딩
      • 유휴 시간에 따른 자동 모델 관리
    3. 계산 최적화:
      • 필요한 임베딩만 계산
      • 유사도 기반 관련 문서 필터링

    이러한 최적화를 통해 같은 쿼리나 유사한 쿼리에 대한 응답 시간을 크게 줄이고, 메모리 사용량과 API 비용을 최소화할 수 있습니다.

    결론

    이번 포스트에서는 LangGraph 애플리케이션의 성능을 최적화하기 위한 캐싱 전략과 리소스 관리 기법에 대해 살펴보았습니다. 이러한 패턴을 적용하면 응답 시간 단축, 리소스 효율성 향상, API 비용 절감 등 다양한 이점을 얻을 수 있습니다.

    다음 포스트에서는 더 많은 성능 최적화 기법인 배치 처리, 결과 스트리밍, 비용 관리, 그리고 성능 모니터링 및 프로파일링에 대해 알아보겠습니다.


     

Designed by Tistory.