-
#5 LangGraph 심화: 성능 최적화 기법 [1]LangChain & LangGraph 2025. 3. 18. 20:00
이전 포스트에서는 LangGraph의 스레드 및 병렬 처리 패턴에 대해 살펴보았습니다. 이번 포스트에서는 LangGraph 애플리케이션의 성능을 더욱 극대화하기 위한 다양한 최적화 기법에 대해 알아보겠습니다. 이번 글은 두 편으로 나누어 진행하며, 이 첫 번째 파트에서는 캐싱 전략과 리소스 관리에 중점을 두겠습니다.
성능 최적화의 중요성
LLM 애플리케이션의 성능은 사용자 경험과 운영 비용에 직접적인 영향을 미칩니다:
- 응답 시간 개선: 빠른 응답은 사용자 만족도를 높입니다.
- 리소스 효율성: 최적화된 애플리케이션은 동일한 하드웨어로 더 많은 작업을 처리할 수 있습니다.
- 비용 절감: API 호출 최적화는 LLM API 비용을 줄입니다.
- 확장성 향상: 효율적인 애플리케이션은 더 많은 사용자와 데이터를 처리할 수 있습니다.
캐싱 전략
반복적인 계산이나 API 호출을 줄이는 캐싱 전략은 LangGraph 애플리케이션의 성능 향상에 큰 도움이 됩니다.
1. 기본 디스크 캐싱
반복되는 비용이 많이 드는 계산 결과를 캐싱하는 방법입니다:
import functools from diskcache import Cache # 디스크 캐시 초기화 cache = Cache('./cache') def cached_node(func): """결과를 캐싱하는 데코레이터""" @functools.wraps(func) def wrapper(state): # 캐시 키 생성 (관련 상태 값만 포함) cache_key = f"{func.__name__}:{hash(frozenset(state.items()))}" # 캐시에서 결과 조회 cached_result = cache.get(cache_key) if cached_result is not None: return cached_result # 결과 계산 및 캐싱 result = func(state) cache.set(cache_key, result, expire=3600) # 1시간 만료 return result return wrapper # 캐싱된 노드 적용 @cached_node def expensive_computation(state: Dict) -> Dict: """비용이 많이 드는 계산 수행""" # ... 복잡한 계산 또는 API 호출 ... return result
이 패턴의 주요 장점:
- 동일 입력에 대한 반복 계산 방지
- 시간이 많이 걸리는 API 호출 감소
- 디스크에 저장되므로 애플리케이션 재시작 후에도 캐시 유지
2. 계층적 캐싱 전략
여러 수준의 캐시를 사용하여 성능을 최적화할 수 있습니다:
import redis from functools import lru_cache import json # Redis 연결 redis_client = redis.Redis(host='localhost', port=6379, db=0) class MultiLevelCache: def __init__(self, memory_size=1000, redis_ttl=3600): self.redis_ttl = redis_ttl self.memory_cache = {} # 메모리 캐시 데코레이터 self.mem_cache = lru_cache(maxsize=memory_size) def get(self, key): # 1. 메모리 캐시 확인 if key in self.memory_cache: return self.memory_cache[key] # 2. Redis 캐시 확인 redis_result = redis_client.get(key) if redis_result: # 메모리 캐시에도 저장 result = json.loads(redis_result) self.memory_cache[key] = result return result return None def set(self, key, value, ttl=None): # Redis에 저장 redis_client.set(key, json.dumps(value), ex=ttl or self.redis_ttl) # 메모리 캐시에도 저장 self.memory_cache[key] = value # 계층적 캐시 사용 예시 cache = MultiLevelCache() def cached_llm_call(prompt, model="gpt-4o"): """LLM 호출 결과를 계층적으로 캐싱""" cache_key = f"llm:{model}:{hash(prompt)}" # 캐시 확인 cached_result = cache.get(cache_key) if cached_result: return cached_result # 캐시 미스: 실제 API 호출 result = call_llm_api(prompt, model) # 결과 캐싱 cache.set(cache_key, result) return result
이 패턴은 다음과 같은 이점을 제공합니다:
- 매우 빠른 액세스를 위한 메모리 캐시 (L1)
- 더 큰 용량과 지속성을 위한 Redis 캐시 (L2)
- 메모리와 디스크 간의 최적의 균형 제공
3. 캐시 무효화 전략
데이터가 변경될 때 캐시를 효율적으로 무효화하는 방법입니다:
class SmartCache: def __init__(self, base_ttl=3600): self.cache = {} self.base_ttl = base_ttl self.version_keys = {} # 키별 버전 추적 def get(self, key, dependencies=None): """종속성을 고려한 캐시 조회""" if key not in self.cache: return None # 캐시 항목과 메타데이터 가져오기 cached_item = self.cache[key] expiry_time = cached_item["expiry"] # 만료 확인 if time.time() > expiry_time: del self.cache[key] return None # 종속성 버전 확인 if dependencies: for dep in dependencies: current_version = self.version_keys.get(dep, 0) cached_version = cached_item["dep_versions"].get(dep, -1) # 종속성이 변경됨 if current_version > cached_version: del self.cache[key] return None return cached_item["value"] def set(self, key, value, ttl=None, dependencies=None): """종속성을 추적하며 캐시에 저장""" # 종속성 버전 기록 dep_versions = {} if dependencies: for dep in dependencies: dep_versions[dep] = self.version_keys.get(dep, 0) # 캐시 설정 self.cache[key] = { "value": value, "expiry": time.time() + (ttl or self.base_ttl), "dep_versions": dep_versions } def invalidate_dependency(self, dependency): """종속성 기반 무효화""" # 종속성 버전 증가 current_version = self.version_keys.get(dependency, 0) self.version_keys[dependency] = current_version + 1 # 스마트 캐시 사용 예시 smart_cache = SmartCache() def cache_aware_node(state: Dict) -> Dict: """종속성 인식 캐싱 노드""" query = state["query"] user_id = state["user_id"] # 사용자 데이터에 종속된 캐시 키 cache_key = f"result:{hash(query)}" dependencies = [f"user:{user_id}", "global_data"] # 캐시 확인 cached_result = smart_cache.get(cache_key, dependencies) if cached_result: return cached_result # 결과 계산 result = compute_result(query, user_id) # 결과 캐싱 smart_cache.set(cache_key, result, dependencies=dependencies) return result # 사용자 데이터 업데이트 시 캐시 무효화 def update_user_data(user_id, new_data): # 데이터 업데이트 save_user_data(user_id, new_data) # 관련 캐시 무효화 smart_cache.invalidate_dependency(f"user:{user_id}")
이 패턴은 다음과 같은 상황에서 유용합니다:
- 데이터가 자주 변경되는 경우
- 여러 캐시 항목이 공통 데이터에 의존하는 경우
- 세분화된 캐시 무효화가 필요한 경우
지연 로딩 및 리소스 관리
필요한 순간에 리소스를 로드하고 효율적으로 관리하는 기법입니다.
1. 모델 지연 로딩
필요할 때만 무거운 모델을 로드하는 패턴입니다:
from functools import lru_cache class ResourceManagementState(TypedDict): query: str model_type: str result: Dict @lru_cache(maxsize=4) # 최대 4개 모델 캐싱 def get_model(model_type: str): """모델 지연 로딩""" if model_type == "gpt4": return load_gpt4_model() elif model_type == "claude": return load_claude_model() elif model_type == "gemini": return load_gemini_model() else: return load_default_model() def model_processing_node(state: ResourceManagementState) -> Dict: """필요한 모델만 로드하여 사용""" model_type = state["model_type"] query = state["query"] # 필요한 모델만 로드 (캐싱됨) model = get_model(model_type) # 모델 사용 result = model.process(query) return {"result": result}
이 패턴의 장점:
- 필요한 경우에만 무거운 리소스를 로드
- lru_cache를 통한 자동 메모리 관리
- 자주 사용하는 모델은 메모리에 유지
2. 리소스 풀링
비용이 많이 드는 리소스를 효율적으로 재사용하는 풀링 패턴입니다:
import threading class ResourcePool: def __init__(self, max_size=5, create_func=None): self.pool = [] self.max_size = max_size self.create_func = create_func self.in_use = set() self.lock = threading.Lock() def get_resource(self): with self.lock: # 사용 가능한 리소스 확인 available = [r for r in self.pool if r not in self.in_use] if available: # 사용 가능한 리소스 반환 resource = available[0] self.in_use.add(resource) return resource # 새 리소스 생성 (풀 크기 제한 내에서) if len(self.pool) < self.max_size: resource = self.create_func() self.pool.append(resource) self.in_use.add(resource) return resource # 풀이 가득 찬 경우 대기 return None def release_resource(self, resource): with self.lock: if resource in self.in_use: self.in_use.remove(resource) # 데이터베이스 연결 풀 예시 db_pool = ResourcePool( max_size=10, create_func=lambda: create_db_connection() ) def database_query_node(state: Dict) -> Dict: """풀링된 데이터베이스 연결 사용""" query = state["query"] # 연결 획득 connection = db_pool.get_resource() try: # 쿼리 실행 result = execute_query(connection, query) return {"result": result} finally: # 연결 반환 db_pool.release_resource(connection)
이 패턴은 다음과 같은 리소스에 적합합니다:
- 데이터베이스 연결
- 외부 API 클라이언트
- 파일 핸들러
- 네트워크 소켓
3. 자동 리소스 정리
사용하지 않는 리소스를 자동으로 정리하는 패턴입니다:
import time import threading import weakref class ResourceManager: def __init__(self, idle_timeout=300): # 5분 타임아웃 self.resources = {} # {id: (resource, last_used_time)} self.idle_timeout = idle_timeout self.lock = threading.Lock() # 백그라운드 정리 스레드 시작 self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True) self.cleanup_thread.start() def get_resource(self, resource_id, create_func): """리소스 획득 (없으면 생성)""" with self.lock: if resource_id in self.resources: resource, _ = self.resources[resource_id] # 마지막 사용 시간 업데이트 self.resources[resource_id] = (resource, time.time()) return resource # 새 리소스 생성 resource = create_func() self.resources[resource_id] = (resource, time.time()) # 약한 참조로 리소스 래핑하여 finalize 콜백 등록 weakref.finalize(resource, self._resource_finalizer, resource_id) return resource def _resource_finalizer(self, resource_id): """리소스가 가비지 컬렉션될 때 호출됨""" with self.lock: if resource_id in self.resources: resource, _ = self.resources[resource_id] # 리소스 정리 (예: 연결 닫기) if hasattr(resource, 'close'): try: resource.close() except: pass del self.resources[resource_id] def _cleanup_loop(self): """오래된, 사용하지 않는 리소스 정리""" while True: time.sleep(60) # 1분마다 확인 with self.lock: current_time = time.time() to_remove = [] # 오래된 리소스 찾기 for resource_id, (resource, last_used) in self.resources.items(): if current_time - last_used > self.idle_timeout: to_remove.append((resource_id, resource)) # 오래된 리소스 정리 for resource_id, resource in to_remove: if hasattr(resource, 'close'): try: resource.close() except: pass del self.resources[resource_id] # 리소스 관리자 사용 예시 resource_manager = ResourceManager() def managed_resource_node(state: Dict) -> Dict: """자동 정리되는 리소스 사용""" resource_id = f"client:{state['client_type']}" # 리소스 획득 (없으면 생성) client = resource_manager.get_resource( resource_id, lambda: create_client(state['client_type']) ) # 클라이언트 사용 result = client.execute(state["command"]) return {"result": result}
이 패턴의 특징:
- 사용하지 않는 유휴 리소스 자동 정리
- 약한 참조를 통한 가비지 컬렉션 지원
- 백그라운드 정리 프로세스로 메모리 누수 방지
실제 적용 사례: 지식 베이스 시스템
이러한 캐싱 및 리소스 관리 패턴을 적용한 실제 LangGraph 애플리케이션의 예를 살펴보겠습니다:
from langgraph.graph import StateGraph from typing import TypedDict, List, Dict import time class KnowledgeBaseState(TypedDict): query: str documents: List[Dict] context: List[str] embeddings: Dict answer: str # 리소스 관리자 class EmbeddingManager: def __init__(self): self.model = None self.last_used = 0 def get_model(self): current_time = time.time() # 모델이 없거나 5분 이상 사용하지 않았으면 (재)로드 if not self.model or (current_time - self.last_used > 300): self.model = load_embedding_model() self.last_used = current_time return self.model # 글로벌 리소스 관리자 embedding_manager = EmbeddingManager() # 캐시 설정 document_cache = Cache('./document_cache') embedding_cache = Cache('./embedding_cache') search_cache = Cache('./search_cache') def retrieve_documents(state: KnowledgeBaseState) -> Dict: """문서 검색 (캐싱 적용)""" query = state["query"] # 캐시 확인 cache_key = f"search:{hash(query)}" cached_result = search_cache.get(cache_key) if cached_result: return {"documents": cached_result} # 새 검색 수행 documents = search_documents(query) # 결과 캐싱 search_cache.set(cache_key, documents, expire=3600) return {"documents": documents} def compute_embeddings(state: KnowledgeBaseState) -> Dict: """임베딩 계산 (캐싱 및 리소스 관리 적용)""" documents = state["documents"] query = state["query"] # 캐시 초기화 embeddings = {"query": None, "documents": []} # 쿼리 임베딩 계산 (캐싱) query_cache_key = f"query_embedding:{hash(query)}" cached_query_embedding = embedding_cache.get(query_cache_key) if cached_query_embedding: embeddings["query"] = cached_query_embedding else: # 임베딩 모델 가져오기 (지연 로딩) model = embedding_manager.get_model() # 임베딩 계산 query_embedding = model.encode(query) # 결과 캐싱 embedding_cache.set(query_cache_key, query_embedding) embeddings["query"] = query_embedding # 문서 임베딩 계산 (캐싱) for doc in documents: doc_id = doc["id"] doc_content = doc["content"] doc_cache_key = f"doc_embedding:{doc_id}" cached_doc_embedding = embedding_cache.get(doc_cache_key) if cached_doc_embedding: embeddings["documents"].append(cached_doc_embedding) else: # 임베딩 모델 가져오기 (이미 로드되어 있을 수 있음) model = embedding_manager.get_model() # 임베딩 계산 doc_embedding = model.encode(doc_content) # 결과 캐싱 embedding_cache.set(doc_cache_key, doc_embedding) embeddings["documents"].append(doc_embedding) return {"embeddings": embeddings} def select_context(state: KnowledgeBaseState) -> Dict: """관련 컨텍스트 선택""" documents = state["documents"] embeddings = state["embeddings"] query_embedding = embeddings["query"] doc_embeddings = embeddings["documents"] # 유사도 계산 및 상위 문서 선택 similarities = compute_similarities(query_embedding, doc_embeddings) ranked_docs = [(documents[i], similarities[i]) for i in range(len(documents))] ranked_docs.sort(key=lambda x: x[1], reverse=True) # 상위 3개 문서만 선택 selected_docs = ranked_docs[:3] # 컨텍스트로 변환 context = [doc["content"] for doc, _ in selected_docs] return {"context": context} def generate_answer(state: KnowledgeBaseState) -> Dict: """답변 생성""" query = state["query"] context = state["context"] # 프롬프트 구성 prompt = f""" Answer the question based on the following context: Context: {' '.join(context)} Question: {query} Answer: """ # LLM 호출 (여기서도 캐싱 적용 가능) answer = call_llm(prompt) return {"answer": answer} # 그래프 구성 kb_graph = StateGraph(KnowledgeBaseState) kb_graph.add_node("retrieve", retrieve_documents) kb_graph.add_node("embed", compute_embeddings) kb_graph.add_node("select", select_context) kb_graph.add_node("generate", generate_answer) # 엣지 설정 kb_graph.add_edge("retrieve", "embed") kb_graph.add_edge("embed", "select") kb_graph.add_edge("select", "generate")
이 지식 베이스 시스템은 다양한 성능 최적화 기법을 적용했습니다:
- 다중 캐싱:
- 문서 검색 결과 캐싱
- 임베딩 계산 결과 캐싱
- (옵션) LLM 호출 결과 캐싱
- 리소스 관리:
- 임베딩 모델 지연 로딩
- 유휴 시간에 따른 자동 모델 관리
- 계산 최적화:
- 필요한 임베딩만 계산
- 유사도 기반 관련 문서 필터링
이러한 최적화를 통해 같은 쿼리나 유사한 쿼리에 대한 응답 시간을 크게 줄이고, 메모리 사용량과 API 비용을 최소화할 수 있습니다.
결론
이번 포스트에서는 LangGraph 애플리케이션의 성능을 최적화하기 위한 캐싱 전략과 리소스 관리 기법에 대해 살펴보았습니다. 이러한 패턴을 적용하면 응답 시간 단축, 리소스 효율성 향상, API 비용 절감 등 다양한 이점을 얻을 수 있습니다.
다음 포스트에서는 더 많은 성능 최적화 기법인 배치 처리, 결과 스트리밍, 비용 관리, 그리고 성능 모니터링 및 프로파일링에 대해 알아보겠습니다.
'LangChain & LangGraph' 카테고리의 다른 글
#4 LangGraph 심화: 스레드 및 병렬 처리 (0) 2025.03.18 #3 LangGraph 심화: 분기 및 조건부 실행과 오류 처리 전략 (0) 2025.03.18 #5 LangGraph 심화: 성능 최적화 기법 [2] (0) 2025.03.18 #1 Lang Graph란 무엇인가요? (0) 2025.03.18 #2 LangGraph 심화: 상태 관리와 대화형 에이전트 구축 (0) 2025.03.18