-
#4 LangGraph 심화: 스레드 및 병렬 처리LangChain & LangGraph 2025. 3. 18. 20:00
이전 포스트에서는 LangGraph의 분기 및 조건부 실행 패턴과 오류 처리 전략에 대해 살펴보았습니다. 이번 포스트에서는 LangGraph의 성능을 극대화하기 위한 스레드 및 병렬 처리 패턴에 대해 알아보겠습니다.
병렬 처리의 중요성
LLM 애플리케이션은 API 호출, 데이터베이스 쿼리, 파일 I/O 등 다양한 I/O 바운드 작업을 포함합니다. 이러한 작업을 순차적으로 실행하면 다음과 같은 문제가 발생합니다:
- 지연 시간 증가: 각 작업이 순차적으로 실행되어 총 실행 시간이 길어집니다.
- 리소스 비효율성: CPU나 메모리가 유휴 상태로 낭비됩니다.
- 확장성 제한: 작업량이 증가할 때 성능이 선형적으로 저하됩니다.
병렬 처리를 통해 이러한 문제를 해결하고 LangGraph 애플리케이션의 성능을 크게 향상시킬 수 있습니다.
스레드 및 병렬 처리 패턴
1. 비동기 노드 구현
LangGraph는 비동기 노드를 지원하여 I/O 작업을 효율적으로 처리할 수 있습니다:
from langgraph.graph import StateGraph from typing import TypedDict, List, Dict, Any import asyncio class AsyncState(TypedDict): query: str search_results: List[Dict] database_results: Dict combined_results: Dict async def async_web_search(state: AsyncState) -> Dict: """비동기 웹 검색 함수""" query = state["query"] # 비동기 API 호출 results = await async_search_api(query) return {"search_results": results} async def async_database_query(state: AsyncState) -> Dict: """비동기 데이터베이스 쿼리 함수""" query = state["query"] # 비동기 데이터베이스 쿼리 results = await async_db_query(query) return {"database_results": results} async def combine_results_async(state: AsyncState) -> Dict: """결과 병합 함수""" # 검색 결과와 데이터베이스 결과 병합 combined_results = { "search": state["search_results"], "database": state["database_results"] } return {"combined_results": combined_results} # 비동기 그래프 구성 async_graph = StateGraph(AsyncState) async_graph.add_node("web_search", async_web_search) async_graph.add_node("db_query", async_database_query) async_graph.add_node("combine_results", combine_results_async) # 병렬 엣지 설정 async_graph.add_edge("root", "web_search") async_graph.add_edge("root", "db_query") async_graph.add_edge("web_search", "combine_results") async_graph.add_edge("db_query", "combine_results") # 비동기 실행 async def run_async_workflow(query: str): state = {"query": query} async_executor = async_graph.acompile() final_state = await async_executor.ainvoke(state) return final_state
이 패턴의 장점:
- 웹 검색과 데이터베이스 쿼리가 병렬로 실행되어 전체 실행 시간이 크게 단축됩니다.
- I/O 대기 시간 동안 다른 작업이 진행되어 리소스 활용도가 향상됩니다.
- 비동기 코드는 동기 코드와 유사한 구조를 유지하므로 이해하기 쉽습니다.
2. 병렬 브랜치 패턴
여러 독립적인 작업을 병렬로 실행하고 결과를 병합하는 패턴입니다:
from langgraph.graph import StateGraph, END class ParallelState(TypedDict): input_text: str summarization: str translation: str sentiment: str final_output: Dict # 그래프 생성 parallel_graph = StateGraph(ParallelState) # 노드 추가 parallel_graph.add_node("parse_input", parse_input_node) parallel_graph.add_node("summarize", summarize_node) parallel_graph.add_node("translate", translate_node) parallel_graph.add_node("analyze_sentiment", sentiment_analysis_node) parallel_graph.add_node("combine_results", combine_results_node) # 병렬 분기 설정 parallel_graph.add_edge("parse_input", "summarize") parallel_graph.add_edge("parse_input", "translate") parallel_graph.add_edge("parse_input", "analyze_sentiment") # 결과 병합 parallel_graph.add_edge("summarize", "combine_results") parallel_graph.add_edge("translate", "combine_results") parallel_graph.add_edge("analyze_sentiment", "combine_results") parallel_graph.add_edge("combine_results", END) # 상태 조인 함수 def join_results(states: List[Dict]) -> Dict: """여러 상태를 병합""" result = states[0].copy() for state in states[1:]: result.update(state) return result parallel_graph.set_join(join_results)
이 패턴의 핵심은 set_join 메서드로 설정한 상태 병합 함수입니다. 이 함수는 여러 병렬 브랜치에서 반환된 상태를 하나의 상태로 병합합니다.
상태 병합 전략
병렬 브랜치의 결과를 병합할 때 다양한 전략을 사용할 수 있습니다:
# 1. 단순 병합 (충돌 발생 시 마지막 값 사용) def simple_join(states): result = {} for state in states: result.update(state) return result # 2. 키별 병합 (특정 키만 각 상태에서 추출) def selective_join(states): result = states[0].copy() # 기본 상태 # 각 상태에서 특정 키만 추출 result["summarization"] = states[1].get("summarization") result["translation"] = states[2].get("translation") result["sentiment"] = states[3].get("sentiment") return result # 3. 충돌 해결 병합 (충돌 시 커스텀 로직 적용) def conflict_resolving_join(states): result = states[0].copy() for state in states[1:]: for key, value in state.items(): if key in result: # 특정 키는 리스트로 병합 if key in ["results", "outputs"]: if isinstance(result[key], list): result[key].extend(value if isinstance(value, list) else [value]) else: result[key] = [result[key], value] # 특정 키는 딕셔너리 병합 elif key in ["metadata", "config"] and isinstance(value, dict): result[key].update(value) # 그 외에는 마지막 값 사용 else: result[key] = value else: result[key] = value return result
3. 작업 분할 및 병합 패턴 (Map-Reduce)
대규모 작업을 작은 단위로 분할하여 병렬 처리한 후 결과를 병합하는 패턴입니다:
class MapReduceState(TypedDict): documents: List[str] chunks: List[List[str]] processed_chunks: List[Dict] final_result: Dict def split_documents(state: MapReduceState) -> Dict: """문서를 청크로 분할 (Map 준비)""" docs = state["documents"] # 각 문서를 더 작은 청크로 분할 chunks = [split_into_chunks(doc) for doc in docs] return {"chunks": chunks} async def process_chunks(state: MapReduceState) -> Dict: """각 청크를 병렬로 처리 (Map)""" chunks = state["chunks"] flat_chunks = [chunk for sublist in chunks for chunk in sublist] # 병렬 처리를 위한 코루틴 생성 coroutines = [process_chunk(chunk) for chunk in flat_chunks] # 모든 코루틴 병렬 실행 processed_chunks = await asyncio.gather(*coroutines) return {"processed_chunks": processed_chunks} def reduce_results(state: MapReduceState) -> Dict: """처리된 청크 결과 병합 (Reduce)""" processed_chunks = state["processed_chunks"] # 결과 병합 (예: 통계 계산, 요약 등) final_result = combine_chunk_results(processed_chunks) return {"final_result": final_result} # 그래프 구성 map_reduce_graph = StateGraph(MapReduceState) map_reduce_graph.add_node("split", split_documents) map_reduce_graph.add_node("process", process_chunks) map_reduce_graph.add_node("reduce", reduce_results) # 엣지 설정 map_reduce_graph.add_edge("split", "process") map_reduce_graph.add_edge("process", "reduce")
이 패턴은 대량의 문서 처리, 대규모 데이터셋 분석 등에 유용합니다. 주요 단계는 다음과 같습니다:
- 분할(Split): 큰 작업을 작은 청크로 나눕니다.
- 매핑(Map): 각 청크를 병렬로 처리합니다.
- 축소(Reduce): 처리된 결과를 병합합니다.
4. 스레드 풀 실행기
CPU 바운드 작업을 효율적으로 처리하기 위한 스레드 풀 패턴입니다:
from concurrent.futures import ThreadPoolExecutor import functools class ThreadPoolState(TypedDict): items: List processed_items: List max_workers: int def process_with_thread_pool(state: ThreadPoolState) -> Dict: """스레드 풀을 사용한 병렬 처리""" items = state["items"] max_workers = state.get("max_workers", 10) def process_item(item): # 각 항목 처리 로직 return compute_intensive_task(item) # 스레드 풀 생성 및 실행 with ThreadPoolExecutor(max_workers=max_workers) as executor: processed_items = list(executor.map(process_item, items)) return {"processed_items": processed_items} # 그래프에 노드 추가 thread_pool_graph = StateGraph(ThreadPoolState) thread_pool_graph.add_node("thread_pool_processor", process_with_thread_pool)
이 패턴은 CPU 바운드 작업에 적합하며, 스레드 풀 크기를 조정하여 시스템 리소스에 맞게 최적화할 수 있습니다.
5. 분산 작업 큐 통합
대규모 시스템을 위한 분산 작업 큐 통합 패턴입니다:
import celery from celery import group # Celery 작업 정의 @celery.task def process_task(item): return process_logic(item) class DistributedState(TypedDict): items: List task_ids: List results: List def submit_to_queue(state: DistributedState) -> Dict: """Celery 작업 큐에 작업 제출""" items = state["items"] # 병렬 작업 그룹 생성 및 실행 job = group(process_task.s(item) for item in items) result = job.apply_async() # 작업 ID 저장 return {"task_ids": result.id} def check_results(state: DistributedState) -> Dict: """작업 결과 확인""" task_id = state["task_ids"] # 결과 조회 result = celery.GroupResult.restore(task_id) if result.ready(): return {"results": result.get()} else: # 아직 완료되지 않음, 상태 유지 return {} def is_complete(state: DistributedState) -> bool: """작업 완료 여부 확인""" return "results" in state and state["results"] is not None # 그래프 구성 distributed_graph = StateGraph(DistributedState) distributed_graph.add_node("submit", submit_to_queue) distributed_graph.add_node("check", check_results) distributed_graph.add_node("process_results", process_final_results) # 엣지 설정 distributed_graph.add_edge("submit", "check") distributed_graph.add_conditional_edges( "check", is_complete, { True: "process_results", False: "check" # 완료될 때까지 반복 확인 } )
이 패턴은 대규모 작업을 여러 워커 노드에 분산하여 처리하므로 단일 서버의 한계를 넘어 확장할 수 있습니다.
실제 적용 사례
1. 대규모 문서 처리 시스템
대량의 문서를 처리하는 시스템에서 병렬 처리를 적용한 예시입니다:
class DocumentProcessingState(TypedDict): documents: List[Dict] processed_documents: List[Dict] analysis_results: Dict async def process_documents_parallel(state: DocumentProcessingState) -> Dict: """여러 문서를 병렬로 처리""" documents = state["documents"] # 병렬 처리 함수 async def process_single_document(doc): # 문서 텍스트 추출 text = extract_text(doc) # 병렬로 여러 분석 수행 tasks = [ analyze_sentiment(text), extract_entities(text), classify_document(text), summarize_text(text) ] sentiment, entities, category, summary = await asyncio.gather(*tasks) return { "id": doc["id"], "sentiment": sentiment, "entities": entities, "category": category, "summary": summary } # 모든 문서 병렬 처리 tasks = [process_single_document(doc) for doc in documents] processed_documents = await asyncio.gather(*tasks) return {"processed_documents": processed_documents} def aggregate_results(state: DocumentProcessingState) -> Dict: """처리된 문서 결과 집계""" documents = state["processed_documents"] # 결과 집계 sentiment_counts = {} entity_counts = {} category_counts = {} for doc in documents: # 감성 집계 sentiment = doc["sentiment"] sentiment_counts[sentiment] = sentiment_counts.get(sentiment, 0) + 1 # 엔티티 집계 for entity in doc["entities"]: entity_counts[entity] = entity_counts.get(entity, 0) + 1 # 카테고리 집계 category = doc["category"] category_counts[category] = category_counts.get(category, 0) + 1 return { "analysis_results": { "sentiment_distribution": sentiment_counts, "top_entities": sorted(entity_counts.items(), key=lambda x: x[1], reverse=True)[:10], "category_distribution": category_counts } } # 그래프 구성 document_graph = StateGraph(DocumentProcessingState) document_graph.add_node("process", process_documents_parallel) document_graph.add_node("aggregate", aggregate_results) document_graph.add_edge("process", "aggregate")
이 시스템은 두 가지 수준의 병렬화를 적용합니다:
- 여러 문서를 동시에 처리합니다.
- 각 문서에 대해 여러 분석 작업을 병렬로 수행합니다.
2. 대화형 에이전트 시스템
사용자 쿼리를 처리하는 대화형 에이전트에 병렬 처리를 적용한 예시입니다:
class ConversationalAgentState(TypedDict): user_query: str context: Dict search_results: List knowledge_base_results: List llm_response: str final_response: str async def retrieve_information(state: ConversationalAgentState) -> Dict: """정보 검색을 병렬로 수행""" query = state["user_query"] # 여러 소스에서 정보 검색 병렬화 search_task = search_web(query) knowledge_base_task = query_knowledge_base(query) # 병렬 실행 search_results, kb_results = await asyncio.gather( search_task, knowledge_base_task ) return { "search_results": search_results, "knowledge_base_results": kb_results } def generate_response(state: ConversationalAgentState) -> Dict: """검색 결과를 바탕으로 응답 생성""" query = state["user_query"] context = state["context"] search_results = state["search_results"] kb_results = state["knowledge_base_results"] # 컨텍스트 구성 prompt = f""" User Query: {query} Web Search Results: {format_search_results(search_results)} Knowledge Base Results: {format_kb_results(kb_results)} Previous Conversation Context: {format_context(context)} Generate a helpful response based on the above information: """ # LLM 호출 llm_response = call_llm(prompt) return {"llm_response": llm_response} def post_process_response(state: ConversationalAgentState) -> Dict: """응답 후처리""" response = state["llm_response"] # 포맷팅, 참조 추가 등 final_response = format_response(response) return {"final_response": final_response} # 그래프 구성 agent_graph = StateGraph(ConversationalAgentState) agent_graph.add_node("retrieve", retrieve_information) agent_graph.add_node("generate", generate_response) agent_graph.add_node("post_process", post_process_response) # 엣지 설정 agent_graph.add_edge("retrieve", "generate") agent_graph.add_edge("generate", "post_process")
이 에이전트는 웹 검색과 지식 베이스 쿼리를 병렬로 수행하여 응답 생성 시간을 단축합니다.
3. 실시간 데이터 처리 파이프라인
스트리밍 데이터를 실시간으로 처리하는 파이프라인에 병렬 처리를 적용한 예시입니다:
class StreamProcessingState(TypedDict): data_batch: List[Dict] filtered_data: List[Dict] enriched_data: List[Dict] transformed_data: List[Dict] final_output: List[Dict] async def filter_data(state: StreamProcessingState) -> Dict: """데이터 필터링""" data = state["data_batch"] # 병렬 필터링 async def filter_item(item): if meets_criteria(item): return item return None # 병렬 실행 filtered_items = await asyncio.gather(*[filter_item(item) for item in data]) # None 제거 filtered_data = [item for item in filtered_items if item is not None] return {"filtered_data": filtered_data} async def enrich_data(state: StreamProcessingState) -> Dict: """데이터 보강""" data = state["filtered_data"] # 병렬 보강 async def enrich_item(item): # 외부 API로 데이터 보강 additional_info = await fetch_additional_info(item["id"]) return {**item, "additional_info": additional_info} # 병렬 실행 enriched_data = await asyncio.gather(*[enrich_item(item) for item in data]) return {"enriched_data": enriched_data} def transform_data(state: StreamProcessingState) -> Dict: """데이터 변환""" data = state["enriched_data"] # 변환 로직 transformed_data = list(map(transform_item, data)) return {"transformed_data": transformed_data} def output_results(state: StreamProcessingState) -> Dict: """결과 출력""" data = state["transformed_data"] # 최종 처리 final_output = prepare_for_output(data) return {"final_output": final_output} # 그래프 구성 stream_graph = StateGraph(StreamProcessingState) stream_graph.add_node("filter", filter_data) stream_graph.add_node("enrich", enrich_data) stream_graph.add_node("transform", transform_data) stream_graph.add_node("output", output_results) # 엣지 설정 stream_graph.add_edge("filter", "enrich") stream_graph.add_edge("enrich", "transform") stream_graph.add_edge("transform", "output")
이 파이프라인은 데이터 배치 내의 각 항목을 병렬로 처리하여 실시간 처리 성능을 향상시킵니다.
병렬 처리의 주의사항
병렬 처리를 구현할 때 고려해야 할 몇 가지 주의사항이 있습니다:
1. 상태 관리 복잡성
병렬 처리는 상태 관리를 복잡하게 만들 수 있습니다. 다음 사항에 유의하세요:
- 불변성 유지: 상태를 직접 수정하지 말고 새 상태를 반환하세요.
- 상태 병합 전략: 병렬 브랜치의 결과를 병합하는 명확한 전략을 정의하세요.
- 경쟁 조건 방지: 여러 노드가 동일한 상태 키를 업데이트할 때 충돌이 발생할 수 있습니다.
2. 리소스 관리
병렬 처리는 리소스 사용량을 증가시킬 수 있습니다:
- 병렬 수준 제한: 시스템 리소스에 맞게 동시 실행 수를 제한하세요.
- 메모리 사용량 모니터링: 병렬 작업이 메모리를 과도하게 사용하지 않도록 하세요.
- 타임아웃 설정: 병렬 작업에 적절한 타임아웃을 설정하여 무한정 대기하지 않도록 하세요.
3. 오류 처리
병렬 실행 중 발생하는 오류를 적절히 처리해야 합니다:
- 부분 실패 처리: 일부 병렬 작업이 실패해도 전체 실행이 중단되지 않도록 하세요.
- 재시도 메커니즘: 실패한 작업에 대한 재시도 전략을 구현하세요.
- 우아한 성능 저하: 일부 기능이 실패해도 핵심 기능은 계속 작동하도록 설계하세요.
결론
LangGraph의 스레드 및 병렬 처리 패턴을 활용하면 LLM 애플리케이션의 성능을 크게 향상시킬 수 있습니다. 비동기 노드, 병렬 브랜치, Map-Reduce 패턴, 스레드 풀, 분산 작업 큐 등 다양한 방법을 통해 독립적인 작업을 병렬로 실행하고 I/O 대기 시간을 최소화할 수 있습니다.
병렬 처리는 특히 외부 API 호출, 데이터베이스 작업, 파일 처리 등 I/O 바운드 작업이 많은 LLM 애플리케이션에서 큰 성능 향상을 가져옵니다. 그러나 상태 관리, 리소스 사용, 오류 처리와 같은 복잡성도 증가하므로 신중하게 설계해야 합니다.
다음 포스트에서는 LangGraph 애플리케이션의 성능을 더욱 극대화하기 위한 다양한 최적화 기법에 대해 알아보겠습니다.
'LangChain & LangGraph' 카테고리의 다른 글
#5 LangGraph 심화: 성능 최적화 기법 [1] (2) 2025.03.18 #3 LangGraph 심화: 분기 및 조건부 실행과 오류 처리 전략 (0) 2025.03.18 #5 LangGraph 심화: 성능 최적화 기법 [2] (0) 2025.03.18 #1 Lang Graph란 무엇인가요? (0) 2025.03.18 #2 LangGraph 심화: 상태 관리와 대화형 에이전트 구축 (0) 2025.03.18