LangChain & LangGraph

#5 LangGraph 심화: 성능 최적화 기법 [1]

ch4u 2025. 3. 18. 20:00

 

이전 포스트에서는 LangGraph의 스레드 및 병렬 처리 패턴에 대해 살펴보았습니다. 이번 포스트에서는 LangGraph 애플리케이션의 성능을 더욱 극대화하기 위한 다양한 최적화 기법에 대해 알아보겠습니다. 이번 글은 두 편으로 나누어 진행하며, 이 첫 번째 파트에서는 캐싱 전략과 리소스 관리에 중점을 두겠습니다.

성능 최적화의 중요성

LLM 애플리케이션의 성능은 사용자 경험과 운영 비용에 직접적인 영향을 미칩니다:

  1. 응답 시간 개선: 빠른 응답은 사용자 만족도를 높입니다.
  2. 리소스 효율성: 최적화된 애플리케이션은 동일한 하드웨어로 더 많은 작업을 처리할 수 있습니다.
  3. 비용 절감: API 호출 최적화는 LLM API 비용을 줄입니다.
  4. 확장성 향상: 효율적인 애플리케이션은 더 많은 사용자와 데이터를 처리할 수 있습니다.

캐싱 전략

반복적인 계산이나 API 호출을 줄이는 캐싱 전략은 LangGraph 애플리케이션의 성능 향상에 큰 도움이 됩니다.

1. 기본 디스크 캐싱

반복되는 비용이 많이 드는 계산 결과를 캐싱하는 방법입니다:

import functools
from diskcache import Cache

# 디스크 캐시 초기화
cache = Cache('./cache')

def cached_node(func):
    """결과를 캐싱하는 데코레이터"""
    @functools.wraps(func)
    def wrapper(state):
        # 캐시 키 생성 (관련 상태 값만 포함)
        cache_key = f"{func.__name__}:{hash(frozenset(state.items()))}"
        
        # 캐시에서 결과 조회
        cached_result = cache.get(cache_key)
        if cached_result is not None:
            return cached_result
        
        # 결과 계산 및 캐싱
        result = func(state)
        cache.set(cache_key, result, expire=3600)  # 1시간 만료
        return result
    
    return wrapper

# 캐싱된 노드 적용
@cached_node
def expensive_computation(state: Dict) -> Dict:
    """비용이 많이 드는 계산 수행"""
    # ... 복잡한 계산 또는 API 호출 ...
    return result

이 패턴의 주요 장점:

  • 동일 입력에 대한 반복 계산 방지
  • 시간이 많이 걸리는 API 호출 감소
  • 디스크에 저장되므로 애플리케이션 재시작 후에도 캐시 유지

2. 계층적 캐싱 전략

여러 수준의 캐시를 사용하여 성능을 최적화할 수 있습니다:

import redis
from functools import lru_cache
import json

# Redis 연결
redis_client = redis.Redis(host='localhost', port=6379, db=0)

class MultiLevelCache:
    def __init__(self, memory_size=1000, redis_ttl=3600):
        self.redis_ttl = redis_ttl
        self.memory_cache = {}
        
        # 메모리 캐시 데코레이터
        self.mem_cache = lru_cache(maxsize=memory_size)
        
    def get(self, key):
        # 1. 메모리 캐시 확인
        if key in self.memory_cache:
            return self.memory_cache[key]
        
        # 2. Redis 캐시 확인
        redis_result = redis_client.get(key)
        if redis_result:
            # 메모리 캐시에도 저장
            result = json.loads(redis_result)
            self.memory_cache[key] = result
            return result
        
        return None
    
    def set(self, key, value, ttl=None):
        # Redis에 저장
        redis_client.set(key, json.dumps(value), ex=ttl or self.redis_ttl)
        
        # 메모리 캐시에도 저장
        self.memory_cache[key] = value

# 계층적 캐시 사용 예시
cache = MultiLevelCache()

def cached_llm_call(prompt, model="gpt-4o"):
    """LLM 호출 결과를 계층적으로 캐싱"""
    cache_key = f"llm:{model}:{hash(prompt)}"
    
    # 캐시 확인
    cached_result = cache.get(cache_key)
    if cached_result:
        return cached_result
    
    # 캐시 미스: 실제 API 호출
    result = call_llm_api(prompt, model)
    
    # 결과 캐싱
    cache.set(cache_key, result)
    
    return result

이 패턴은 다음과 같은 이점을 제공합니다:

  • 매우 빠른 액세스를 위한 메모리 캐시 (L1)
  • 더 큰 용량과 지속성을 위한 Redis 캐시 (L2)
  • 메모리와 디스크 간의 최적의 균형 제공

3. 캐시 무효화 전략

데이터가 변경될 때 캐시를 효율적으로 무효화하는 방법입니다:

class SmartCache:
    def __init__(self, base_ttl=3600):
        self.cache = {}
        self.base_ttl = base_ttl
        self.version_keys = {}  # 키별 버전 추적
    
    def get(self, key, dependencies=None):
        """종속성을 고려한 캐시 조회"""
        if key not in self.cache:
            return None
        
        # 캐시 항목과 메타데이터 가져오기
        cached_item = self.cache[key]
        expiry_time = cached_item["expiry"]
        
        # 만료 확인
        if time.time() > expiry_time:
            del self.cache[key]
            return None
        
        # 종속성 버전 확인
        if dependencies:
            for dep in dependencies:
                current_version = self.version_keys.get(dep, 0)
                cached_version = cached_item["dep_versions"].get(dep, -1)
                
                # 종속성이 변경됨
                if current_version > cached_version:
                    del self.cache[key]
                    return None
        
        return cached_item["value"]
    
    def set(self, key, value, ttl=None, dependencies=None):
        """종속성을 추적하며 캐시에 저장"""
        # 종속성 버전 기록
        dep_versions = {}
        if dependencies:
            for dep in dependencies:
                dep_versions[dep] = self.version_keys.get(dep, 0)
        
        # 캐시 설정
        self.cache[key] = {
            "value": value,
            "expiry": time.time() + (ttl or self.base_ttl),
            "dep_versions": dep_versions
        }
    
    def invalidate_dependency(self, dependency):
        """종속성 기반 무효화"""
        # 종속성 버전 증가
        current_version = self.version_keys.get(dependency, 0)
        self.version_keys[dependency] = current_version + 1

# 스마트 캐시 사용 예시
smart_cache = SmartCache()

def cache_aware_node(state: Dict) -> Dict:
    """종속성 인식 캐싱 노드"""
    query = state["query"]
    user_id = state["user_id"]
    
    # 사용자 데이터에 종속된 캐시 키
    cache_key = f"result:{hash(query)}"
    dependencies = [f"user:{user_id}", "global_data"]
    
    # 캐시 확인
    cached_result = smart_cache.get(cache_key, dependencies)
    if cached_result:
        return cached_result
    
    # 결과 계산
    result = compute_result(query, user_id)
    
    # 결과 캐싱
    smart_cache.set(cache_key, result, dependencies=dependencies)
    
    return result

# 사용자 데이터 업데이트 시 캐시 무효화
def update_user_data(user_id, new_data):
    # 데이터 업데이트
    save_user_data(user_id, new_data)
    
    # 관련 캐시 무효화
    smart_cache.invalidate_dependency(f"user:{user_id}")

이 패턴은 다음과 같은 상황에서 유용합니다:

  • 데이터가 자주 변경되는 경우
  • 여러 캐시 항목이 공통 데이터에 의존하는 경우
  • 세분화된 캐시 무효화가 필요한 경우

지연 로딩 및 리소스 관리

필요한 순간에 리소스를 로드하고 효율적으로 관리하는 기법입니다.

1. 모델 지연 로딩

필요할 때만 무거운 모델을 로드하는 패턴입니다:

from functools import lru_cache

class ResourceManagementState(TypedDict):
    query: str
    model_type: str
    result: Dict

@lru_cache(maxsize=4)  # 최대 4개 모델 캐싱
def get_model(model_type: str):
    """모델 지연 로딩"""
    if model_type == "gpt4":
        return load_gpt4_model()
    elif model_type == "claude":
        return load_claude_model()
    elif model_type == "gemini":
        return load_gemini_model()
    else:
        return load_default_model()

def model_processing_node(state: ResourceManagementState) -> Dict:
    """필요한 모델만 로드하여 사용"""
    model_type = state["model_type"]
    query = state["query"]
    
    # 필요한 모델만 로드 (캐싱됨)
    model = get_model(model_type)
    
    # 모델 사용
    result = model.process(query)
    
    return {"result": result}

이 패턴의 장점:

  • 필요한 경우에만 무거운 리소스를 로드
  • lru_cache를 통한 자동 메모리 관리
  • 자주 사용하는 모델은 메모리에 유지

2. 리소스 풀링

비용이 많이 드는 리소스를 효율적으로 재사용하는 풀링 패턴입니다:

import threading

class ResourcePool:
    def __init__(self, max_size=5, create_func=None):
        self.pool = []
        self.max_size = max_size
        self.create_func = create_func
        self.in_use = set()
        self.lock = threading.Lock()
    
    def get_resource(self):
        with self.lock:
            # 사용 가능한 리소스 확인
            available = [r for r in self.pool if r not in self.in_use]
            
            if available:
                # 사용 가능한 리소스 반환
                resource = available[0]
                self.in_use.add(resource)
                return resource
            
            # 새 리소스 생성 (풀 크기 제한 내에서)
            if len(self.pool) < self.max_size:
                resource = self.create_func()
                self.pool.append(resource)
                self.in_use.add(resource)
                return resource
            
            # 풀이 가득 찬 경우 대기
            return None
    
    def release_resource(self, resource):
        with self.lock:
            if resource in self.in_use:
                self.in_use.remove(resource)

# 데이터베이스 연결 풀 예시
db_pool = ResourcePool(
    max_size=10,
    create_func=lambda: create_db_connection()
)

def database_query_node(state: Dict) -> Dict:
    """풀링된 데이터베이스 연결 사용"""
    query = state["query"]
    
    # 연결 획득
    connection = db_pool.get_resource()
    
    try:
        # 쿼리 실행
        result = execute_query(connection, query)
        return {"result": result}
    finally:
        # 연결 반환
        db_pool.release_resource(connection)

이 패턴은 다음과 같은 리소스에 적합합니다:

  • 데이터베이스 연결
  • 외부 API 클라이언트
  • 파일 핸들러
  • 네트워크 소켓

3. 자동 리소스 정리

사용하지 않는 리소스를 자동으로 정리하는 패턴입니다:

import time
import threading
import weakref

class ResourceManager:
    def __init__(self, idle_timeout=300):  # 5분 타임아웃
        self.resources = {}  # {id: (resource, last_used_time)}
        self.idle_timeout = idle_timeout
        self.lock = threading.Lock()
        
        # 백그라운드 정리 스레드 시작
        self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
        self.cleanup_thread.start()
    
    def get_resource(self, resource_id, create_func):
        """리소스 획득 (없으면 생성)"""
        with self.lock:
            if resource_id in self.resources:
                resource, _ = self.resources[resource_id]
                # 마지막 사용 시간 업데이트
                self.resources[resource_id] = (resource, time.time())
                return resource
            
            # 새 리소스 생성
            resource = create_func()
            self.resources[resource_id] = (resource, time.time())
            
            # 약한 참조로 리소스 래핑하여 finalize 콜백 등록
            weakref.finalize(resource, self._resource_finalizer, resource_id)
            
            return resource
    
    def _resource_finalizer(self, resource_id):
        """리소스가 가비지 컬렉션될 때 호출됨"""
        with self.lock:
            if resource_id in self.resources:
                resource, _ = self.resources[resource_id]
                
                # 리소스 정리 (예: 연결 닫기)
                if hasattr(resource, 'close'):
                    try:
                        resource.close()
                    except:
                        pass
                
                del self.resources[resource_id]
    
    def _cleanup_loop(self):
        """오래된, 사용하지 않는 리소스 정리"""
        while True:
            time.sleep(60)  # 1분마다 확인
            
            with self.lock:
                current_time = time.time()
                to_remove = []
                
                # 오래된 리소스 찾기
                for resource_id, (resource, last_used) in self.resources.items():
                    if current_time - last_used > self.idle_timeout:
                        to_remove.append((resource_id, resource))
                
                # 오래된 리소스 정리
                for resource_id, resource in to_remove:
                    if hasattr(resource, 'close'):
                        try:
                            resource.close()
                        except:
                            pass
                    
                    del self.resources[resource_id]

# 리소스 관리자 사용 예시
resource_manager = ResourceManager()

def managed_resource_node(state: Dict) -> Dict:
    """자동 정리되는 리소스 사용"""
    resource_id = f"client:{state['client_type']}"
    
    # 리소스 획득 (없으면 생성)
    client = resource_manager.get_resource(
        resource_id,
        lambda: create_client(state['client_type'])
    )
    
    # 클라이언트 사용
    result = client.execute(state["command"])
    
    return {"result": result}

이 패턴의 특징:

  • 사용하지 않는 유휴 리소스 자동 정리
  • 약한 참조를 통한 가비지 컬렉션 지원
  • 백그라운드 정리 프로세스로 메모리 누수 방지

실제 적용 사례: 지식 베이스 시스템

이러한 캐싱 및 리소스 관리 패턴을 적용한 실제 LangGraph 애플리케이션의 예를 살펴보겠습니다:

from langgraph.graph import StateGraph
from typing import TypedDict, List, Dict
import time

class KnowledgeBaseState(TypedDict):
    query: str
    documents: List[Dict]
    context: List[str]
    embeddings: Dict
    answer: str

# 리소스 관리자
class EmbeddingManager:
    def __init__(self):
        self.model = None
        self.last_used = 0
    
    def get_model(self):
        current_time = time.time()
        
        # 모델이 없거나 5분 이상 사용하지 않았으면 (재)로드
        if not self.model or (current_time - self.last_used > 300):
            self.model = load_embedding_model()
        
        self.last_used = current_time
        return self.model

# 글로벌 리소스 관리자
embedding_manager = EmbeddingManager()

# 캐시 설정
document_cache = Cache('./document_cache')
embedding_cache = Cache('./embedding_cache')
search_cache = Cache('./search_cache')

def retrieve_documents(state: KnowledgeBaseState) -> Dict:
    """문서 검색 (캐싱 적용)"""
    query = state["query"]
    
    # 캐시 확인
    cache_key = f"search:{hash(query)}"
    cached_result = search_cache.get(cache_key)
    if cached_result:
        return {"documents": cached_result}
    
    # 새 검색 수행
    documents = search_documents(query)
    
    # 결과 캐싱
    search_cache.set(cache_key, documents, expire=3600)
    
    return {"documents": documents}

def compute_embeddings(state: KnowledgeBaseState) -> Dict:
    """임베딩 계산 (캐싱 및 리소스 관리 적용)"""
    documents = state["documents"]
    query = state["query"]
    
    # 캐시 초기화
    embeddings = {"query": None, "documents": []}
    
    # 쿼리 임베딩 계산 (캐싱)
    query_cache_key = f"query_embedding:{hash(query)}"
    cached_query_embedding = embedding_cache.get(query_cache_key)
    
    if cached_query_embedding:
        embeddings["query"] = cached_query_embedding
    else:
        # 임베딩 모델 가져오기 (지연 로딩)
        model = embedding_manager.get_model()
        
        # 임베딩 계산
        query_embedding = model.encode(query)
        
        # 결과 캐싱
        embedding_cache.set(query_cache_key, query_embedding)
        embeddings["query"] = query_embedding
    
    # 문서 임베딩 계산 (캐싱)
    for doc in documents:
        doc_id = doc["id"]
        doc_content = doc["content"]
        
        doc_cache_key = f"doc_embedding:{doc_id}"
        cached_doc_embedding = embedding_cache.get(doc_cache_key)
        
        if cached_doc_embedding:
            embeddings["documents"].append(cached_doc_embedding)
        else:
            # 임베딩 모델 가져오기 (이미 로드되어 있을 수 있음)
            model = embedding_manager.get_model()
            
            # 임베딩 계산
            doc_embedding = model.encode(doc_content)
            
            # 결과 캐싱
            embedding_cache.set(doc_cache_key, doc_embedding)
            embeddings["documents"].append(doc_embedding)
    
    return {"embeddings": embeddings}

def select_context(state: KnowledgeBaseState) -> Dict:
    """관련 컨텍스트 선택"""
    documents = state["documents"]
    embeddings = state["embeddings"]
    
    query_embedding = embeddings["query"]
    doc_embeddings = embeddings["documents"]
    
    # 유사도 계산 및 상위 문서 선택
    similarities = compute_similarities(query_embedding, doc_embeddings)
    ranked_docs = [(documents[i], similarities[i]) for i in range(len(documents))]
    ranked_docs.sort(key=lambda x: x[1], reverse=True)
    
    # 상위 3개 문서만 선택
    selected_docs = ranked_docs[:3]
    
    # 컨텍스트로 변환
    context = [doc["content"] for doc, _ in selected_docs]
    
    return {"context": context}

def generate_answer(state: KnowledgeBaseState) -> Dict:
    """답변 생성"""
    query = state["query"]
    context = state["context"]
    
    # 프롬프트 구성
    prompt = f"""
    Answer the question based on the following context:
    
    Context:
    {' '.join(context)}
    
    Question: {query}
    
    Answer:
    """
    
    # LLM 호출 (여기서도 캐싱 적용 가능)
    answer = call_llm(prompt)
    
    return {"answer": answer}

# 그래프 구성
kb_graph = StateGraph(KnowledgeBaseState)
kb_graph.add_node("retrieve", retrieve_documents)
kb_graph.add_node("embed", compute_embeddings)
kb_graph.add_node("select", select_context)
kb_graph.add_node("generate", generate_answer)

# 엣지 설정
kb_graph.add_edge("retrieve", "embed")
kb_graph.add_edge("embed", "select")
kb_graph.add_edge("select", "generate")

이 지식 베이스 시스템은 다양한 성능 최적화 기법을 적용했습니다:

  1. 다중 캐싱:
    • 문서 검색 결과 캐싱
    • 임베딩 계산 결과 캐싱
    • (옵션) LLM 호출 결과 캐싱
  2. 리소스 관리:
    • 임베딩 모델 지연 로딩
    • 유휴 시간에 따른 자동 모델 관리
  3. 계산 최적화:
    • 필요한 임베딩만 계산
    • 유사도 기반 관련 문서 필터링

이러한 최적화를 통해 같은 쿼리나 유사한 쿼리에 대한 응답 시간을 크게 줄이고, 메모리 사용량과 API 비용을 최소화할 수 있습니다.

결론

이번 포스트에서는 LangGraph 애플리케이션의 성능을 최적화하기 위한 캐싱 전략과 리소스 관리 기법에 대해 살펴보았습니다. 이러한 패턴을 적용하면 응답 시간 단축, 리소스 효율성 향상, API 비용 절감 등 다양한 이점을 얻을 수 있습니다.

다음 포스트에서는 더 많은 성능 최적화 기법인 배치 처리, 결과 스트리밍, 비용 관리, 그리고 성능 모니터링 및 프로파일링에 대해 알아보겠습니다.