이 글은 누구를 위한 것인가
- 10분 이상 실행되는 AI 에이전트를 운영하는 개발자
- 에이전트 중간 실패 시 처음부터 재실행하는 비효율을 겪고 있는 팀
- LangGraph, CrewAI, 자체 구현 에이전트의 신뢰성을 높이려는 엔지니어
들어가며
AI 에이전트가 30분짜리 리서치 작업을 진행하다가 28분 째에 API 오류로 실패했다. 처음부터 다시 시작해야 한다. 이미 처리한 웹 페이지 50개, 생성된 요약 30개가 모두 사라진다. API 비용도 다시 나간다.
이것은 AI 에이전트가 프로덕션에서 신뢰성 있게 동작하기 위해 반드시 해결해야 할 문제다. 소프트웨어 개발에서는 오래전부터 체크포인트와 재시도 전략이 있었다. 같은 원칙을 AI 에이전트에 적용해야 한다.
이 글은 bluefoxdev.kr의 AI 에이전트 신뢰성 설계 를 참고하고, 체크포인트 구현 관점에서 확장하여 작성했습니다.
1. 에이전트 상태 설계
1.1 저장해야 할 것과 안 해도 되는 것
# 저장 필요
class AgentCheckpoint:
task_id: str
step_index: int # 현재 단계
completed_steps: list[dict] # 완료된 단계 결과
intermediate_results: dict # 중간 산출물
tool_call_history: list # 사용한 도구 이력
context_window: list # 현재 메시지 히스토리
created_at: datetime
last_updated: datetime
# 저장 불필요 (재계산 가능)
# - LLM 응답 메타데이터 (토큰 수 등)
# - 임시 변수
# - 로그 메시지
1.2 상태 저장소 선택
| 저장소 | 장점 | 단점 | 적합한 경우 |
|---|---|---|---|
| Redis | 빠름, TTL 지원 | 휘발성 | 수 시간 이내 작업 |
| PostgreSQL | 영구 저장, 조회 용이 | 설정 복잡 | 장기 작업, 감사 필요 |
| 파일 시스템 | 간단, 무료 | 확장성 낮음 | 로컬 개발, 단일 인스턴스 |
| S3/GCS | 대용량, 내구성 | 지연 있음 | 대형 아티팩트 포함 |
2. 체크포인트 구현 패턴
2.1 기본 체크포인트 클래스
import json
import redis
from datetime import datetime
from typing import Any
class AgentCheckpointManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.ttl = 24 * 3600 # 24시간 유지
def save(self, task_id: str, state: dict) -> None:
key = f"agent:checkpoint:{task_id}"
state['last_updated'] = datetime.utcnow().isoformat()
self.redis.setex(
key,
self.ttl,
json.dumps(state)
)
def load(self, task_id: str) -> dict | None:
key = f"agent:checkpoint:{task_id}"
data = self.redis.get(key)
return json.loads(data) if data else None
def delete(self, task_id: str) -> None:
self.redis.delete(f"agent:checkpoint:{task_id}")
def exists(self, task_id: str) -> bool:
return bool(self.redis.exists(f"agent:checkpoint:{task_id}"))
2.2 체크포인트 지원 에이전트
class CheckpointedAgent:
def __init__(self, task_id: str, checkpoint_manager: AgentCheckpointManager):
self.task_id = task_id
self.checkpoint = checkpoint_manager
self.state = self._load_or_init()
def _load_or_init(self) -> dict:
existing = self.checkpoint.load(self.task_id)
if existing:
print(f"체크포인트 발견: {existing['step_index']}단계부터 재개")
return existing
return {
'step_index': 0,
'completed_steps': [],
'intermediate_results': {},
'context': []
}
async def run_step(self, step: AgentStep) -> Any:
step_key = f"step_{step.index}"
# 이미 완료된 단계는 건너뜀
if step_key in self.state['intermediate_results']:
print(f"단계 {step.index} 이미 완료, 결과 재사용")
return self.state['intermediate_results'][step_key]
try:
result = await step.execute(self.state['context'])
# 결과 저장 및 체크포인트
self.state['intermediate_results'][step_key] = result
self.state['step_index'] = step.index + 1
self.state['completed_steps'].append({
'index': step.index,
'name': step.name,
'completed_at': datetime.utcnow().isoformat()
})
self.checkpoint.save(self.task_id, self.state)
return result
except Exception as e:
print(f"단계 {step.index} 실패: {e}")
raise # 재시도 로직에서 처리
async def run(self, steps: list[AgentStep]) -> dict:
for step in steps[self.state['step_index']:]:
await self.run_step(step)
# 완료 시 체크포인트 삭제
self.checkpoint.delete(self.task_id)
return self.state['intermediate_results']
3. LangGraph에서의 체크포인트
3.1 내장 체크포인터 사용
from langgraph.graph import StateGraph
from langgraph.checkpoint.redis import RedisSaver
import redis
# Redis 체크포인터 설정
checkpoint_saver = RedisSaver(
redis.Redis(host='localhost', port=6379)
)
# 그래프 정의
builder = StateGraph(AgentState)
builder.add_node("research", research_node)
builder.add_node("analyze", analyze_node)
builder.add_node("write", write_node)
builder.set_entry_point("research")
builder.add_edge("research", "analyze")
builder.add_edge("analyze", "write")
# 체크포인터와 함께 컴파일
graph = builder.compile(checkpointer=checkpoint_saver)
# 실행 (thread_id로 동일 세션 재개 가능)
config = {"configurable": {"thread_id": "task-12345"}}
result = await graph.ainvoke(initial_state, config=config)
3.2 중간 재개
# 실패 후 재시작 - 같은 thread_id 사용
async def resume_or_start(task_id: str, initial_state: dict):
config = {"configurable": {"thread_id": task_id}}
# 기존 체크포인트 확인
existing_state = await graph.aget_state(config)
if existing_state.values:
print(f"기존 작업 재개: {task_id}")
# None을 입력하면 마지막 체크포인트에서 재개
result = await graph.ainvoke(None, config=config)
else:
print(f"새 작업 시작: {task_id}")
result = await graph.ainvoke(initial_state, config=config)
return result
4. 멱등성 보장
에이전트가 도구를 중복 호출해도 동일한 결과가 나와야 한다.
class IdempotentToolCaller:
def __init__(self, cache: dict):
self.cache = cache
async def call_tool(self, tool_name: str, args: dict) -> Any:
# 캐시 키 생성
cache_key = f"{tool_name}:{json.dumps(args, sort_keys=True)}"
if cache_key in self.cache:
print(f"캐시 히트: {tool_name}")
return self.cache[cache_key]
result = await self._actual_tool_call(tool_name, args)
self.cache[cache_key] = result
return result
5. 재시도 전략
from tenacity import retry, stop_after_attempt, wait_exponential
class RetryableAgent:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60),
reraise=True
)
async def execute_with_retry(self, step: AgentStep):
try:
return await self.run_step(step)
except RateLimitError:
# 레이트 리밋은 더 길게 대기
await asyncio.sleep(60)
raise
except NonRetryableError:
# 재시도 불가 오류는 즉시 중단
raise StopAsyncIteration()
마무리: 핵심 원칙
- 단계별 저장: 각 단계 완료 즉시 체크포인트 저장
- 멱등성: 같은 입력은 항상 같은 출력 (중복 실행 안전)
- 선택적 재시도: 모든 오류가 재시도 대상이 아님 (레이트 리밋 vs 잘못된 입력)
- TTL 관리: 오래된 체크포인트 자동 정리로 저장소 낭비 방지
장기 실행 에이전트의 신뢰성은 처음부터 설계에 포함되어야 한다. 나중에 추가하면 배가 더 어렵다.