온라인 학습과 지속적 ML 업데이트: 실시간 모델 개선 시스템

AI 기술

온라인 학습지속적 ML컨셉 드리프트자동 재훈련MLflow

이 글은 누구를 위한 것인가

  • 데이터 분포가 변해서 모델 성능이 시간이 지나면서 떨어지는 팀
  • 새 학습 데이터가 쌓일 때 자동으로 재훈련하고 싶은 ML 엔지니어
  • 모델 업데이트를 안전하게 배포하는 파이프라인이 필요한 팀

들어가며

작년에 90% 정확도이던 사기 감지 모델이 올해 70%가 됐다. 사기 패턴이 바뀐 것이다. ML 모델은 배포 후 방치하면 성능이 저하된다. 자동 재훈련 파이프라인이 필요하다.

이 글은 bluefoxdev.kr의 MLOps 자동화 를 참고하여 작성했습니다.


1. 지속적 학습 전략

[재훈련 트리거 방식]

시간 기반:
  매일/매주 자동 재훈련
  단순하지만 불필요한 재훈련 가능
  
데이터 기반:
  새 데이터 N건 이상 쌓이면 재훈련
  더 효율적, 데이터 드리프트 미반영

성능 기반:
  정확도가 임계값 이하로 떨어지면 재훈련
  드리프트 탐지 후 자동 트리거
  가장 반응적

[컨셉 드리프트 감지]

데이터 드리프트 (입력 분포 변화):
  KS 테스트 (연속형 피처)
  카이제곱 검정 (범주형 피처)
  PSI (Population Stability Index)

개념 드리프트 (입력-출력 관계 변화):
  예측 정확도 모니터링
  예측 분포 모니터링
  실제 레이블과 비교

[재훈련 파이프라인]
  드리프트 감지
    → 데이터 수집 & 검증
    → 모델 재훈련
    → 평가 (기존 모델과 비교)
    → Shadow 모드 검증 (실트래픽으로 비교)
    → 점진적 배포 (10% → 50% → 100%)
    → 모니터링

2. 드리프트 감지 및 자동 재훈련

import numpy as np
from scipy import stats
from dataclasses import dataclass
from datetime import datetime
import anthropic

client = anthropic.Anthropic()

@dataclass
class DriftReport:
    feature_name: str
    drift_detected: bool
    psi_score: float
    ks_statistic: float
    p_value: float
    severity: str

def compute_psi(
    reference: np.ndarray,
    current: np.ndarray,
    n_bins: int = 10,
) -> float:
    """PSI (Population Stability Index) 계산"""
    
    # 기준 데이터로 분위수 구간 설정
    quantiles = np.percentile(reference, np.linspace(0, 100, n_bins + 1))
    quantiles[0] = -np.inf
    quantiles[-1] = np.inf
    
    ref_counts = np.histogram(reference, bins=quantiles)[0]
    cur_counts = np.histogram(current, bins=quantiles)[0]
    
    # 0 방지를 위한 스무딩
    ref_pct = (ref_counts + 0.001) / (len(reference) + 0.001 * n_bins)
    cur_pct = (cur_counts + 0.001) / (len(current) + 0.001 * n_bins)
    
    psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
    return float(psi)

def detect_drift(
    reference_data: dict[str, np.ndarray],
    current_data: dict[str, np.ndarray],
    psi_threshold: float = 0.2,
) -> list[DriftReport]:
    """피처별 드리프트 감지"""
    
    reports = []
    
    for feature_name in reference_data:
        if feature_name not in current_data:
            continue
        
        ref = reference_data[feature_name]
        cur = current_data[feature_name]
        
        psi = compute_psi(ref, cur)
        ks_stat, p_value = stats.ks_2samp(ref, cur)
        
        drift_detected = psi > psi_threshold or p_value < 0.05
        severity = "high" if psi > 0.25 else "medium" if drift_detected else "ok"
        
        reports.append(DriftReport(
            feature_name=feature_name,
            drift_detected=drift_detected,
            psi_score=psi,
            ks_statistic=ks_stat,
            p_value=p_value,
            severity=severity,
        ))
    
    return reports

class AutoRetrainingPipeline:
    """자동 재훈련 파이프라인"""
    
    def __init__(self, mlflow_tracking_uri: str):
        import mlflow
        mlflow.set_tracking_uri(mlflow_tracking_uri)
        self.mlflow = mlflow
    
    async def evaluate_retrain_need(
        self,
        current_accuracy: float,
        drift_reports: list[DriftReport],
        accuracy_threshold: float = 0.85,
    ) -> dict:
        """재훈련 필요성 평가"""
        
        high_drift_features = [r.feature_name for r in drift_reports if r.severity == "high"]
        
        needs_retrain = (
            current_accuracy < accuracy_threshold or
            len(high_drift_features) > 0
        )
        
        # LLM으로 재훈련 권고 분석
        drift_summary = "\n".join(
            f"- {r.feature_name}: PSI={r.psi_score:.3f}, 드리프트={r.drift_detected}"
            for r in drift_reports
        )
        
        response = client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=300,
            messages=[{
                "role": "user",
                "content": f"""ML 모델 상태를 분석하세요:

현재 정확도: {current_accuracy:.1%}
임계값: {accuracy_threshold:.1%}

피처 드리프트:
{drift_summary}

재훈련이 필요한지와 우선순위를 판단해주세요. JSON으로:
{{"needs_retrain": true/false, "urgency": "immediate/scheduled/optional", "reason": "이유"}}"""
            }]
        )
        
        import json
        analysis = json.loads(response.content[0].text)
        
        return {
            "needs_retrain": needs_retrain or analysis["needs_retrain"],
            "urgency": analysis["urgency"],
            "reason": analysis["reason"],
            "high_drift_features": high_drift_features,
        }
    
    async def run_retraining(
        self,
        train_fn,
        eval_fn,
        new_data_path: str,
    ) -> str:
        """모델 재훈련 실행 및 MLflow 등록"""
        
        with self.mlflow.start_run():
            self.mlflow.log_param("retrain_trigger", "auto")
            self.mlflow.log_param("data_path", new_data_path)
            self.mlflow.log_param("retrain_timestamp", datetime.utcnow().isoformat())
            
            model = await train_fn(new_data_path)
            metrics = await eval_fn(model)
            
            for k, v in metrics.items():
                self.mlflow.log_metric(k, v)
            
            model_uri = self.mlflow.sklearn.log_model(model, "model").model_uri
            
        return model_uri

마무리

모델은 배포하면 끝이 아니라 시작이다. PSI > 0.2이면 주의, PSI > 0.25이면 즉시 재훈련이 필요하다. 재훈련 비용을 낮추려면 학습 데이터를 증분 저장하고, 전체 재훈련보다 소량 새 데이터로 파인튜닝하는 방식을 택하라. Shadow 모드로 실트래픽에서 신구 모델을 비교하면 배포 리스크를 크게 줄인다.