AI 에이전트 상태 관리: 장기 실행 작업의 체크포인트와 재시도 전략

AI 에이전트

AI 에이전트체크포인트장기 실행LangGraph에이전트 상태 관리

이 글은 누구를 위한 것인가

  • 10분 이상 실행되는 AI 에이전트를 운영하는 개발자
  • 에이전트 중간 실패 시 처음부터 재실행하는 비효율을 겪고 있는 팀
  • LangGraph, CrewAI, 자체 구현 에이전트의 신뢰성을 높이려는 엔지니어

들어가며

AI 에이전트가 30분짜리 리서치 작업을 진행하다가 28분 째에 API 오류로 실패했다. 처음부터 다시 시작해야 한다. 이미 처리한 웹 페이지 50개, 생성된 요약 30개가 모두 사라진다. API 비용도 다시 나간다.

이것은 AI 에이전트가 프로덕션에서 신뢰성 있게 동작하기 위해 반드시 해결해야 할 문제다. 소프트웨어 개발에서는 오래전부터 체크포인트와 재시도 전략이 있었다. 같은 원칙을 AI 에이전트에 적용해야 한다.

이 글은 bluefoxdev.kr의 AI 에이전트 신뢰성 설계 를 참고하고, 체크포인트 구현 관점에서 확장하여 작성했습니다.


1. 에이전트 상태 설계

1.1 저장해야 할 것과 안 해도 되는 것

# 저장 필요
class AgentCheckpoint:
    task_id: str
    step_index: int             # 현재 단계
    completed_steps: list[dict] # 완료된 단계 결과
    intermediate_results: dict  # 중간 산출물
    tool_call_history: list     # 사용한 도구 이력
    context_window: list        # 현재 메시지 히스토리
    created_at: datetime
    last_updated: datetime

# 저장 불필요 (재계산 가능)
# - LLM 응답 메타데이터 (토큰 수 등)
# - 임시 변수
# - 로그 메시지

1.2 상태 저장소 선택

저장소장점단점적합한 경우
Redis빠름, TTL 지원휘발성수 시간 이내 작업
PostgreSQL영구 저장, 조회 용이설정 복잡장기 작업, 감사 필요
파일 시스템간단, 무료확장성 낮음로컬 개발, 단일 인스턴스
S3/GCS대용량, 내구성지연 있음대형 아티팩트 포함

2. 체크포인트 구현 패턴

2.1 기본 체크포인트 클래스

import json
import redis
from datetime import datetime
from typing import Any

class AgentCheckpointManager:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.ttl = 24 * 3600  # 24시간 유지
    
    def save(self, task_id: str, state: dict) -> None:
        key = f"agent:checkpoint:{task_id}"
        state['last_updated'] = datetime.utcnow().isoformat()
        self.redis.setex(
            key,
            self.ttl,
            json.dumps(state)
        )
    
    def load(self, task_id: str) -> dict | None:
        key = f"agent:checkpoint:{task_id}"
        data = self.redis.get(key)
        return json.loads(data) if data else None
    
    def delete(self, task_id: str) -> None:
        self.redis.delete(f"agent:checkpoint:{task_id}")
    
    def exists(self, task_id: str) -> bool:
        return bool(self.redis.exists(f"agent:checkpoint:{task_id}"))

2.2 체크포인트 지원 에이전트

class CheckpointedAgent:
    def __init__(self, task_id: str, checkpoint_manager: AgentCheckpointManager):
        self.task_id = task_id
        self.checkpoint = checkpoint_manager
        self.state = self._load_or_init()
    
    def _load_or_init(self) -> dict:
        existing = self.checkpoint.load(self.task_id)
        if existing:
            print(f"체크포인트 발견: {existing['step_index']}단계부터 재개")
            return existing
        return {
            'step_index': 0,
            'completed_steps': [],
            'intermediate_results': {},
            'context': []
        }
    
    async def run_step(self, step: AgentStep) -> Any:
        step_key = f"step_{step.index}"
        
        # 이미 완료된 단계는 건너뜀
        if step_key in self.state['intermediate_results']:
            print(f"단계 {step.index} 이미 완료, 결과 재사용")
            return self.state['intermediate_results'][step_key]
        
        try:
            result = await step.execute(self.state['context'])
            
            # 결과 저장 및 체크포인트
            self.state['intermediate_results'][step_key] = result
            self.state['step_index'] = step.index + 1
            self.state['completed_steps'].append({
                'index': step.index,
                'name': step.name,
                'completed_at': datetime.utcnow().isoformat()
            })
            
            self.checkpoint.save(self.task_id, self.state)
            return result
            
        except Exception as e:
            print(f"단계 {step.index} 실패: {e}")
            raise  # 재시도 로직에서 처리
    
    async def run(self, steps: list[AgentStep]) -> dict:
        for step in steps[self.state['step_index']:]:
            await self.run_step(step)
        
        # 완료 시 체크포인트 삭제
        self.checkpoint.delete(self.task_id)
        return self.state['intermediate_results']

3. LangGraph에서의 체크포인트

3.1 내장 체크포인터 사용

from langgraph.graph import StateGraph
from langgraph.checkpoint.redis import RedisSaver
import redis

# Redis 체크포인터 설정
checkpoint_saver = RedisSaver(
    redis.Redis(host='localhost', port=6379)
)

# 그래프 정의
builder = StateGraph(AgentState)
builder.add_node("research", research_node)
builder.add_node("analyze", analyze_node)
builder.add_node("write", write_node)

builder.set_entry_point("research")
builder.add_edge("research", "analyze")
builder.add_edge("analyze", "write")

# 체크포인터와 함께 컴파일
graph = builder.compile(checkpointer=checkpoint_saver)

# 실행 (thread_id로 동일 세션 재개 가능)
config = {"configurable": {"thread_id": "task-12345"}}
result = await graph.ainvoke(initial_state, config=config)

3.2 중간 재개

# 실패 후 재시작 - 같은 thread_id 사용
async def resume_or_start(task_id: str, initial_state: dict):
    config = {"configurable": {"thread_id": task_id}}
    
    # 기존 체크포인트 확인
    existing_state = await graph.aget_state(config)
    
    if existing_state.values:
        print(f"기존 작업 재개: {task_id}")
        # None을 입력하면 마지막 체크포인트에서 재개
        result = await graph.ainvoke(None, config=config)
    else:
        print(f"새 작업 시작: {task_id}")
        result = await graph.ainvoke(initial_state, config=config)
    
    return result

4. 멱등성 보장

에이전트가 도구를 중복 호출해도 동일한 결과가 나와야 한다.

class IdempotentToolCaller:
    def __init__(self, cache: dict):
        self.cache = cache
    
    async def call_tool(self, tool_name: str, args: dict) -> Any:
        # 캐시 키 생성
        cache_key = f"{tool_name}:{json.dumps(args, sort_keys=True)}"
        
        if cache_key in self.cache:
            print(f"캐시 히트: {tool_name}")
            return self.cache[cache_key]
        
        result = await self._actual_tool_call(tool_name, args)
        self.cache[cache_key] = result
        return result

5. 재시도 전략

from tenacity import retry, stop_after_attempt, wait_exponential

class RetryableAgent:
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=60),
        reraise=True
    )
    async def execute_with_retry(self, step: AgentStep):
        try:
            return await self.run_step(step)
        except RateLimitError:
            # 레이트 리밋은 더 길게 대기
            await asyncio.sleep(60)
            raise
        except NonRetryableError:
            # 재시도 불가 오류는 즉시 중단
            raise StopAsyncIteration()

마무리: 핵심 원칙

  1. 단계별 저장: 각 단계 완료 즉시 체크포인트 저장
  2. 멱등성: 같은 입력은 항상 같은 출력 (중복 실행 안전)
  3. 선택적 재시도: 모든 오류가 재시도 대상이 아님 (레이트 리밋 vs 잘못된 입력)
  4. TTL 관리: 오래된 체크포인트 자동 정리로 저장소 낭비 방지

장기 실행 에이전트의 신뢰성은 처음부터 설계에 포함되어야 한다. 나중에 추가하면 배가 더 어렵다.