#5 LangGraph 심화: 성능 최적화 기법 [1]

이전 포스트에서는 LangGraph의 스레드 및 병렬 처리 패턴에 대해 살펴보았습니다. 이번 포스트에서는 LangGraph 애플리케이션의 성능을 더욱 극대화하기 위한 다양한 최적화 기법에 대해 알아보겠습니다. 이번 글은 두 편으로 나누어 진행하며, 이 첫 번째 파트에서는 캐싱 전략과 리소스 관리에 중점을 두겠습니다.
성능 최적화의 중요성
LLM 애플리케이션의 성능은 사용자 경험과 운영 비용에 직접적인 영향을 미칩니다:
- 응답 시간 개선: 빠른 응답은 사용자 만족도를 높입니다.
- 리소스 효율성: 최적화된 애플리케이션은 동일한 하드웨어로 더 많은 작업을 처리할 수 있습니다.
- 비용 절감: API 호출 최적화는 LLM API 비용을 줄입니다.
- 확장성 향상: 효율적인 애플리케이션은 더 많은 사용자와 데이터를 처리할 수 있습니다.
캐싱 전략
반복적인 계산이나 API 호출을 줄이는 캐싱 전략은 LangGraph 애플리케이션의 성능 향상에 큰 도움이 됩니다.
1. 기본 디스크 캐싱
반복되는 비용이 많이 드는 계산 결과를 캐싱하는 방법입니다:
import functools
from diskcache import Cache
# 디스크 캐시 초기화
cache = Cache('./cache')
def cached_node(func):
"""결과를 캐싱하는 데코레이터"""
@functools.wraps(func)
def wrapper(state):
# 캐시 키 생성 (관련 상태 값만 포함)
cache_key = f"{func.__name__}:{hash(frozenset(state.items()))}"
# 캐시에서 결과 조회
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
# 결과 계산 및 캐싱
result = func(state)
cache.set(cache_key, result, expire=3600) # 1시간 만료
return result
return wrapper
# 캐싱된 노드 적용
@cached_node
def expensive_computation(state: Dict) -> Dict:
"""비용이 많이 드는 계산 수행"""
# ... 복잡한 계산 또는 API 호출 ...
return result
이 패턴의 주요 장점:
- 동일 입력에 대한 반복 계산 방지
- 시간이 많이 걸리는 API 호출 감소
- 디스크에 저장되므로 애플리케이션 재시작 후에도 캐시 유지
2. 계층적 캐싱 전략
여러 수준의 캐시를 사용하여 성능을 최적화할 수 있습니다:
import redis
from functools import lru_cache
import json
# Redis 연결
redis_client = redis.Redis(host='localhost', port=6379, db=0)
class MultiLevelCache:
def __init__(self, memory_size=1000, redis_ttl=3600):
self.redis_ttl = redis_ttl
self.memory_cache = {}
# 메모리 캐시 데코레이터
self.mem_cache = lru_cache(maxsize=memory_size)
def get(self, key):
# 1. 메모리 캐시 확인
if key in self.memory_cache:
return self.memory_cache[key]
# 2. Redis 캐시 확인
redis_result = redis_client.get(key)
if redis_result:
# 메모리 캐시에도 저장
result = json.loads(redis_result)
self.memory_cache[key] = result
return result
return None
def set(self, key, value, ttl=None):
# Redis에 저장
redis_client.set(key, json.dumps(value), ex=ttl or self.redis_ttl)
# 메모리 캐시에도 저장
self.memory_cache[key] = value
# 계층적 캐시 사용 예시
cache = MultiLevelCache()
def cached_llm_call(prompt, model="gpt-4o"):
"""LLM 호출 결과를 계층적으로 캐싱"""
cache_key = f"llm:{model}:{hash(prompt)}"
# 캐시 확인
cached_result = cache.get(cache_key)
if cached_result:
return cached_result
# 캐시 미스: 실제 API 호출
result = call_llm_api(prompt, model)
# 결과 캐싱
cache.set(cache_key, result)
return result
이 패턴은 다음과 같은 이점을 제공합니다:
- 매우 빠른 액세스를 위한 메모리 캐시 (L1)
- 더 큰 용량과 지속성을 위한 Redis 캐시 (L2)
- 메모리와 디스크 간의 최적의 균형 제공
3. 캐시 무효화 전략
데이터가 변경될 때 캐시를 효율적으로 무효화하는 방법입니다:
class SmartCache:
def __init__(self, base_ttl=3600):
self.cache = {}
self.base_ttl = base_ttl
self.version_keys = {} # 키별 버전 추적
def get(self, key, dependencies=None):
"""종속성을 고려한 캐시 조회"""
if key not in self.cache:
return None
# 캐시 항목과 메타데이터 가져오기
cached_item = self.cache[key]
expiry_time = cached_item["expiry"]
# 만료 확인
if time.time() > expiry_time:
del self.cache[key]
return None
# 종속성 버전 확인
if dependencies:
for dep in dependencies:
current_version = self.version_keys.get(dep, 0)
cached_version = cached_item["dep_versions"].get(dep, -1)
# 종속성이 변경됨
if current_version > cached_version:
del self.cache[key]
return None
return cached_item["value"]
def set(self, key, value, ttl=None, dependencies=None):
"""종속성을 추적하며 캐시에 저장"""
# 종속성 버전 기록
dep_versions = {}
if dependencies:
for dep in dependencies:
dep_versions[dep] = self.version_keys.get(dep, 0)
# 캐시 설정
self.cache[key] = {
"value": value,
"expiry": time.time() + (ttl or self.base_ttl),
"dep_versions": dep_versions
}
def invalidate_dependency(self, dependency):
"""종속성 기반 무효화"""
# 종속성 버전 증가
current_version = self.version_keys.get(dependency, 0)
self.version_keys[dependency] = current_version + 1
# 스마트 캐시 사용 예시
smart_cache = SmartCache()
def cache_aware_node(state: Dict) -> Dict:
"""종속성 인식 캐싱 노드"""
query = state["query"]
user_id = state["user_id"]
# 사용자 데이터에 종속된 캐시 키
cache_key = f"result:{hash(query)}"
dependencies = [f"user:{user_id}", "global_data"]
# 캐시 확인
cached_result = smart_cache.get(cache_key, dependencies)
if cached_result:
return cached_result
# 결과 계산
result = compute_result(query, user_id)
# 결과 캐싱
smart_cache.set(cache_key, result, dependencies=dependencies)
return result
# 사용자 데이터 업데이트 시 캐시 무효화
def update_user_data(user_id, new_data):
# 데이터 업데이트
save_user_data(user_id, new_data)
# 관련 캐시 무효화
smart_cache.invalidate_dependency(f"user:{user_id}")
이 패턴은 다음과 같은 상황에서 유용합니다:
- 데이터가 자주 변경되는 경우
- 여러 캐시 항목이 공통 데이터에 의존하는 경우
- 세분화된 캐시 무효화가 필요한 경우
지연 로딩 및 리소스 관리
필요한 순간에 리소스를 로드하고 효율적으로 관리하는 기법입니다.
1. 모델 지연 로딩
필요할 때만 무거운 모델을 로드하는 패턴입니다:
from functools import lru_cache
class ResourceManagementState(TypedDict):
query: str
model_type: str
result: Dict
@lru_cache(maxsize=4) # 최대 4개 모델 캐싱
def get_model(model_type: str):
"""모델 지연 로딩"""
if model_type == "gpt4":
return load_gpt4_model()
elif model_type == "claude":
return load_claude_model()
elif model_type == "gemini":
return load_gemini_model()
else:
return load_default_model()
def model_processing_node(state: ResourceManagementState) -> Dict:
"""필요한 모델만 로드하여 사용"""
model_type = state["model_type"]
query = state["query"]
# 필요한 모델만 로드 (캐싱됨)
model = get_model(model_type)
# 모델 사용
result = model.process(query)
return {"result": result}
이 패턴의 장점:
- 필요한 경우에만 무거운 리소스를 로드
- lru_cache를 통한 자동 메모리 관리
- 자주 사용하는 모델은 메모리에 유지
2. 리소스 풀링
비용이 많이 드는 리소스를 효율적으로 재사용하는 풀링 패턴입니다:
import threading
class ResourcePool:
def __init__(self, max_size=5, create_func=None):
self.pool = []
self.max_size = max_size
self.create_func = create_func
self.in_use = set()
self.lock = threading.Lock()
def get_resource(self):
with self.lock:
# 사용 가능한 리소스 확인
available = [r for r in self.pool if r not in self.in_use]
if available:
# 사용 가능한 리소스 반환
resource = available[0]
self.in_use.add(resource)
return resource
# 새 리소스 생성 (풀 크기 제한 내에서)
if len(self.pool) < self.max_size:
resource = self.create_func()
self.pool.append(resource)
self.in_use.add(resource)
return resource
# 풀이 가득 찬 경우 대기
return None
def release_resource(self, resource):
with self.lock:
if resource in self.in_use:
self.in_use.remove(resource)
# 데이터베이스 연결 풀 예시
db_pool = ResourcePool(
max_size=10,
create_func=lambda: create_db_connection()
)
def database_query_node(state: Dict) -> Dict:
"""풀링된 데이터베이스 연결 사용"""
query = state["query"]
# 연결 획득
connection = db_pool.get_resource()
try:
# 쿼리 실행
result = execute_query(connection, query)
return {"result": result}
finally:
# 연결 반환
db_pool.release_resource(connection)
이 패턴은 다음과 같은 리소스에 적합합니다:
- 데이터베이스 연결
- 외부 API 클라이언트
- 파일 핸들러
- 네트워크 소켓
3. 자동 리소스 정리
사용하지 않는 리소스를 자동으로 정리하는 패턴입니다:
import time
import threading
import weakref
class ResourceManager:
def __init__(self, idle_timeout=300): # 5분 타임아웃
self.resources = {} # {id: (resource, last_used_time)}
self.idle_timeout = idle_timeout
self.lock = threading.Lock()
# 백그라운드 정리 스레드 시작
self.cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
self.cleanup_thread.start()
def get_resource(self, resource_id, create_func):
"""리소스 획득 (없으면 생성)"""
with self.lock:
if resource_id in self.resources:
resource, _ = self.resources[resource_id]
# 마지막 사용 시간 업데이트
self.resources[resource_id] = (resource, time.time())
return resource
# 새 리소스 생성
resource = create_func()
self.resources[resource_id] = (resource, time.time())
# 약한 참조로 리소스 래핑하여 finalize 콜백 등록
weakref.finalize(resource, self._resource_finalizer, resource_id)
return resource
def _resource_finalizer(self, resource_id):
"""리소스가 가비지 컬렉션될 때 호출됨"""
with self.lock:
if resource_id in self.resources:
resource, _ = self.resources[resource_id]
# 리소스 정리 (예: 연결 닫기)
if hasattr(resource, 'close'):
try:
resource.close()
except:
pass
del self.resources[resource_id]
def _cleanup_loop(self):
"""오래된, 사용하지 않는 리소스 정리"""
while True:
time.sleep(60) # 1분마다 확인
with self.lock:
current_time = time.time()
to_remove = []
# 오래된 리소스 찾기
for resource_id, (resource, last_used) in self.resources.items():
if current_time - last_used > self.idle_timeout:
to_remove.append((resource_id, resource))
# 오래된 리소스 정리
for resource_id, resource in to_remove:
if hasattr(resource, 'close'):
try:
resource.close()
except:
pass
del self.resources[resource_id]
# 리소스 관리자 사용 예시
resource_manager = ResourceManager()
def managed_resource_node(state: Dict) -> Dict:
"""자동 정리되는 리소스 사용"""
resource_id = f"client:{state['client_type']}"
# 리소스 획득 (없으면 생성)
client = resource_manager.get_resource(
resource_id,
lambda: create_client(state['client_type'])
)
# 클라이언트 사용
result = client.execute(state["command"])
return {"result": result}
이 패턴의 특징:
- 사용하지 않는 유휴 리소스 자동 정리
- 약한 참조를 통한 가비지 컬렉션 지원
- 백그라운드 정리 프로세스로 메모리 누수 방지
실제 적용 사례: 지식 베이스 시스템
이러한 캐싱 및 리소스 관리 패턴을 적용한 실제 LangGraph 애플리케이션의 예를 살펴보겠습니다:
from langgraph.graph import StateGraph
from typing import TypedDict, List, Dict
import time
class KnowledgeBaseState(TypedDict):
query: str
documents: List[Dict]
context: List[str]
embeddings: Dict
answer: str
# 리소스 관리자
class EmbeddingManager:
def __init__(self):
self.model = None
self.last_used = 0
def get_model(self):
current_time = time.time()
# 모델이 없거나 5분 이상 사용하지 않았으면 (재)로드
if not self.model or (current_time - self.last_used > 300):
self.model = load_embedding_model()
self.last_used = current_time
return self.model
# 글로벌 리소스 관리자
embedding_manager = EmbeddingManager()
# 캐시 설정
document_cache = Cache('./document_cache')
embedding_cache = Cache('./embedding_cache')
search_cache = Cache('./search_cache')
def retrieve_documents(state: KnowledgeBaseState) -> Dict:
"""문서 검색 (캐싱 적용)"""
query = state["query"]
# 캐시 확인
cache_key = f"search:{hash(query)}"
cached_result = search_cache.get(cache_key)
if cached_result:
return {"documents": cached_result}
# 새 검색 수행
documents = search_documents(query)
# 결과 캐싱
search_cache.set(cache_key, documents, expire=3600)
return {"documents": documents}
def compute_embeddings(state: KnowledgeBaseState) -> Dict:
"""임베딩 계산 (캐싱 및 리소스 관리 적용)"""
documents = state["documents"]
query = state["query"]
# 캐시 초기화
embeddings = {"query": None, "documents": []}
# 쿼리 임베딩 계산 (캐싱)
query_cache_key = f"query_embedding:{hash(query)}"
cached_query_embedding = embedding_cache.get(query_cache_key)
if cached_query_embedding:
embeddings["query"] = cached_query_embedding
else:
# 임베딩 모델 가져오기 (지연 로딩)
model = embedding_manager.get_model()
# 임베딩 계산
query_embedding = model.encode(query)
# 결과 캐싱
embedding_cache.set(query_cache_key, query_embedding)
embeddings["query"] = query_embedding
# 문서 임베딩 계산 (캐싱)
for doc in documents:
doc_id = doc["id"]
doc_content = doc["content"]
doc_cache_key = f"doc_embedding:{doc_id}"
cached_doc_embedding = embedding_cache.get(doc_cache_key)
if cached_doc_embedding:
embeddings["documents"].append(cached_doc_embedding)
else:
# 임베딩 모델 가져오기 (이미 로드되어 있을 수 있음)
model = embedding_manager.get_model()
# 임베딩 계산
doc_embedding = model.encode(doc_content)
# 결과 캐싱
embedding_cache.set(doc_cache_key, doc_embedding)
embeddings["documents"].append(doc_embedding)
return {"embeddings": embeddings}
def select_context(state: KnowledgeBaseState) -> Dict:
"""관련 컨텍스트 선택"""
documents = state["documents"]
embeddings = state["embeddings"]
query_embedding = embeddings["query"]
doc_embeddings = embeddings["documents"]
# 유사도 계산 및 상위 문서 선택
similarities = compute_similarities(query_embedding, doc_embeddings)
ranked_docs = [(documents[i], similarities[i]) for i in range(len(documents))]
ranked_docs.sort(key=lambda x: x[1], reverse=True)
# 상위 3개 문서만 선택
selected_docs = ranked_docs[:3]
# 컨텍스트로 변환
context = [doc["content"] for doc, _ in selected_docs]
return {"context": context}
def generate_answer(state: KnowledgeBaseState) -> Dict:
"""답변 생성"""
query = state["query"]
context = state["context"]
# 프롬프트 구성
prompt = f"""
Answer the question based on the following context:
Context:
{' '.join(context)}
Question: {query}
Answer:
"""
# LLM 호출 (여기서도 캐싱 적용 가능)
answer = call_llm(prompt)
return {"answer": answer}
# 그래프 구성
kb_graph = StateGraph(KnowledgeBaseState)
kb_graph.add_node("retrieve", retrieve_documents)
kb_graph.add_node("embed", compute_embeddings)
kb_graph.add_node("select", select_context)
kb_graph.add_node("generate", generate_answer)
# 엣지 설정
kb_graph.add_edge("retrieve", "embed")
kb_graph.add_edge("embed", "select")
kb_graph.add_edge("select", "generate")
이 지식 베이스 시스템은 다양한 성능 최적화 기법을 적용했습니다:
- 다중 캐싱:
- 문서 검색 결과 캐싱
- 임베딩 계산 결과 캐싱
- (옵션) LLM 호출 결과 캐싱
- 리소스 관리:
- 임베딩 모델 지연 로딩
- 유휴 시간에 따른 자동 모델 관리
- 계산 최적화:
- 필요한 임베딩만 계산
- 유사도 기반 관련 문서 필터링
이러한 최적화를 통해 같은 쿼리나 유사한 쿼리에 대한 응답 시간을 크게 줄이고, 메모리 사용량과 API 비용을 최소화할 수 있습니다.
결론
이번 포스트에서는 LangGraph 애플리케이션의 성능을 최적화하기 위한 캐싱 전략과 리소스 관리 기법에 대해 살펴보았습니다. 이러한 패턴을 적용하면 응답 시간 단축, 리소스 효율성 향상, API 비용 절감 등 다양한 이점을 얻을 수 있습니다.
다음 포스트에서는 더 많은 성능 최적화 기법인 배치 처리, 결과 스트리밍, 비용 관리, 그리고 성능 모니터링 및 프로파일링에 대해 알아보겠습니다.