피처 스토어와 ML 서빙 아키텍처: 실시간 특성 관리와 모델 서빙

AI 기술

피처 스토어ML 서빙Feast실시간 ML모델 배포

이 글은 누구를 위한 것인가

  • 학습·서빙 피처 불일치(training-serving skew) 문제를 겪는 팀
  • 여러 ML 모델이 같은 피처를 중복 계산하는 팀
  • 실시간 ML 추천/스코어링 서비스를 구축하는 엔지니어

들어가며

학습 때 계산한 피처와 서빙 때 계산한 피처가 다르면 모델이 이상하게 작동한다. 피처 스토어는 이 불일치를 해결하고, 피처를 재사용 가능한 자산으로 관리한다.

이 글은 bluefoxdev.kr의 ML 프로덕션 인프라 를 참고하여 작성했습니다.


1. 피처 스토어 아키텍처

[피처 스토어 구성요소]

오프라인 스토어 (배치):
  용도: 모델 학습, 히스토리 피처
  저장소: S3 + Parquet, BigQuery, Redshift
  갱신 주기: 시간/일 단위 배치
  지연시간: 분~시간 허용

온라인 스토어 (실시간):
  용도: 서빙 시 저레이턴시 피처 조회
  저장소: Redis, DynamoDB, Cassandra
  갱신 주기: 초~분 단위 스트리밍
  지연시간: 1-10ms 목표

[피처 계산 방식]
  배치 피처: 일별/주별 집계 (구매 횟수 30일 평균)
  스트리밍 피처: 실시간 집계 (최근 1시간 클릭)
  온디맨드 피처: 요청 시 계산 (요청-아이템 유사도)

[Training-Serving Skew 방지]
  피처 정의를 코드로 관리 (Feature View)
  학습/서빙 모두 동일 피처 로직 사용
  피처 버전 관리
  스냅샷으로 과거 피처 재현 가능

2. 피처 스토어 구현

import redis
import numpy as np
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any

@dataclass
class Feature:
    name: str
    dtype: str  # 'float', 'int', 'str', 'list'
    description: str
    ttl_seconds: int = 3600

class OnlineFeatureStore:
    """Redis 기반 온라인 피처 스토어"""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url, decode_responses=True)
    
    def _key(self, entity_id: str, feature_name: str) -> str:
        return f"fs:{feature_name}:{entity_id}"
    
    async def set_features(
        self,
        entity_id: str,
        features: dict[str, Any],
        ttl: int = 3600,
    ):
        """피처 저장"""
        pipeline = self.redis.pipeline()
        
        for name, value in features.items():
            key = self._key(entity_id, name)
            if isinstance(value, (list, dict)):
                import json
                pipeline.setex(key, ttl, json.dumps(value))
            else:
                pipeline.setex(key, ttl, str(value))
        
        pipeline.execute()
    
    async def get_features(
        self,
        entity_id: str,
        feature_names: list[str],
    ) -> dict[str, Any]:
        """피처 일괄 조회"""
        pipeline = self.redis.pipeline()
        
        for name in feature_names:
            pipeline.get(self._key(entity_id, name))
        
        values = pipeline.execute()
        
        result = {}
        for name, value in zip(feature_names, values):
            if value is None:
                result[name] = None
            else:
                try:
                    import json
                    result[name] = json.loads(value)
                except (json.JSONDecodeError, ValueError):
                    try:
                        result[name] = float(value)
                    except ValueError:
                        result[name] = value
        
        return result

class RealtimeFeatureComputer:
    """실시간 피처 계산"""
    
    def __init__(self, feature_store: OnlineFeatureStore):
        self.store = feature_store
    
    async def compute_user_features(self, user_id: str) -> dict:
        """사용자 피처 실시간 계산"""
        
        # 기존 저장된 집계 피처 조회
        stored = await self.store.get_features(user_id, [
            "purchase_count_30d",
            "avg_order_value_90d",
            "category_preferences",
            "last_active_timestamp",
        ])
        
        # 온디맨드 피처 계산
        last_active = stored.get("last_active_timestamp")
        days_since_active = 0
        if last_active:
            last_dt = datetime.fromisoformat(last_active)
            days_since_active = (datetime.utcnow() - last_dt).days
        
        return {
            **stored,
            "days_since_active": days_since_active,
            "is_high_value": (stored.get("avg_order_value_90d") or 0) > 100000,
        }

def detect_feature_drift(
    reference_stats: dict,
    current_stats: dict,
    threshold: float = 0.1,
) -> dict:
    """피처 드리프트 감지 (PSI 기반)"""
    
    drift_results = {}
    
    for feature_name in reference_stats:
        if feature_name not in current_stats:
            continue
        
        ref = reference_stats[feature_name]
        cur = current_stats[feature_name]
        
        # Population Stability Index
        ref_mean = ref.get("mean", 0)
        cur_mean = cur.get("mean", 0)
        
        if ref_mean == 0:
            psi = 0 if cur_mean == 0 else float("inf")
        else:
            psi = abs((cur_mean - ref_mean) / ref_mean)
        
        drift_results[feature_name] = {
            "psi": psi,
            "drift_detected": psi > threshold,
            "severity": "high" if psi > 0.25 else "medium" if psi > threshold else "ok",
        }
    
    return drift_results

async def serve_ml_prediction(
    user_id: str,
    item_id: str,
    model,
    feature_store: OnlineFeatureStore,
) -> float:
    """ML 서빙: 피처 조회 → 예측"""
    
    computer = RealtimeFeatureComputer(feature_store)
    
    user_features = await computer.compute_user_features(user_id)
    item_features = await feature_store.get_features(item_id, [
        "category", "price", "avg_rating", "review_count"
    ])
    
    # 피처 벡터 구성
    feature_vector = np.array([
        user_features.get("purchase_count_30d") or 0,
        user_features.get("avg_order_value_90d") or 0,
        user_features.get("days_since_active") or 0,
        item_features.get("price") or 0,
        item_features.get("avg_rating") or 0,
    ])
    
    return float(model.predict([feature_vector])[0])

마무리

피처 스토어의 핵심 가치는 피처 재사용이다. 추천, 사기 탐지, 이탈 예측이 모두 "사용자 30일 구매 횟수"를 쓴다면 한 번만 계산하고 공유하면 된다. Training-serving skew는 피처 정의를 코드로 버전 관리하고 학습·서빙 모두 동일 로직을 사용해야 해결된다.