이 글은 누구를 위한 것인가
- 학습·서빙 피처 불일치(training-serving skew) 문제를 겪는 팀
- 여러 ML 모델이 같은 피처를 중복 계산하는 팀
- 실시간 ML 추천/스코어링 서비스를 구축하는 엔지니어
들어가며
학습 때 계산한 피처와 서빙 때 계산한 피처가 다르면 모델이 이상하게 작동한다. 피처 스토어는 이 불일치를 해결하고, 피처를 재사용 가능한 자산으로 관리한다.
이 글은 bluefoxdev.kr의 ML 프로덕션 인프라 를 참고하여 작성했습니다.
1. 피처 스토어 아키텍처
[피처 스토어 구성요소]
오프라인 스토어 (배치):
용도: 모델 학습, 히스토리 피처
저장소: S3 + Parquet, BigQuery, Redshift
갱신 주기: 시간/일 단위 배치
지연시간: 분~시간 허용
온라인 스토어 (실시간):
용도: 서빙 시 저레이턴시 피처 조회
저장소: Redis, DynamoDB, Cassandra
갱신 주기: 초~분 단위 스트리밍
지연시간: 1-10ms 목표
[피처 계산 방식]
배치 피처: 일별/주별 집계 (구매 횟수 30일 평균)
스트리밍 피처: 실시간 집계 (최근 1시간 클릭)
온디맨드 피처: 요청 시 계산 (요청-아이템 유사도)
[Training-Serving Skew 방지]
피처 정의를 코드로 관리 (Feature View)
학습/서빙 모두 동일 피처 로직 사용
피처 버전 관리
스냅샷으로 과거 피처 재현 가능
2. 피처 스토어 구현
import redis
import numpy as np
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
@dataclass
class Feature:
name: str
dtype: str # 'float', 'int', 'str', 'list'
description: str
ttl_seconds: int = 3600
class OnlineFeatureStore:
"""Redis 기반 온라인 피처 스토어"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url, decode_responses=True)
def _key(self, entity_id: str, feature_name: str) -> str:
return f"fs:{feature_name}:{entity_id}"
async def set_features(
self,
entity_id: str,
features: dict[str, Any],
ttl: int = 3600,
):
"""피처 저장"""
pipeline = self.redis.pipeline()
for name, value in features.items():
key = self._key(entity_id, name)
if isinstance(value, (list, dict)):
import json
pipeline.setex(key, ttl, json.dumps(value))
else:
pipeline.setex(key, ttl, str(value))
pipeline.execute()
async def get_features(
self,
entity_id: str,
feature_names: list[str],
) -> dict[str, Any]:
"""피처 일괄 조회"""
pipeline = self.redis.pipeline()
for name in feature_names:
pipeline.get(self._key(entity_id, name))
values = pipeline.execute()
result = {}
for name, value in zip(feature_names, values):
if value is None:
result[name] = None
else:
try:
import json
result[name] = json.loads(value)
except (json.JSONDecodeError, ValueError):
try:
result[name] = float(value)
except ValueError:
result[name] = value
return result
class RealtimeFeatureComputer:
"""실시간 피처 계산"""
def __init__(self, feature_store: OnlineFeatureStore):
self.store = feature_store
async def compute_user_features(self, user_id: str) -> dict:
"""사용자 피처 실시간 계산"""
# 기존 저장된 집계 피처 조회
stored = await self.store.get_features(user_id, [
"purchase_count_30d",
"avg_order_value_90d",
"category_preferences",
"last_active_timestamp",
])
# 온디맨드 피처 계산
last_active = stored.get("last_active_timestamp")
days_since_active = 0
if last_active:
last_dt = datetime.fromisoformat(last_active)
days_since_active = (datetime.utcnow() - last_dt).days
return {
**stored,
"days_since_active": days_since_active,
"is_high_value": (stored.get("avg_order_value_90d") or 0) > 100000,
}
def detect_feature_drift(
reference_stats: dict,
current_stats: dict,
threshold: float = 0.1,
) -> dict:
"""피처 드리프트 감지 (PSI 기반)"""
drift_results = {}
for feature_name in reference_stats:
if feature_name not in current_stats:
continue
ref = reference_stats[feature_name]
cur = current_stats[feature_name]
# Population Stability Index
ref_mean = ref.get("mean", 0)
cur_mean = cur.get("mean", 0)
if ref_mean == 0:
psi = 0 if cur_mean == 0 else float("inf")
else:
psi = abs((cur_mean - ref_mean) / ref_mean)
drift_results[feature_name] = {
"psi": psi,
"drift_detected": psi > threshold,
"severity": "high" if psi > 0.25 else "medium" if psi > threshold else "ok",
}
return drift_results
async def serve_ml_prediction(
user_id: str,
item_id: str,
model,
feature_store: OnlineFeatureStore,
) -> float:
"""ML 서빙: 피처 조회 → 예측"""
computer = RealtimeFeatureComputer(feature_store)
user_features = await computer.compute_user_features(user_id)
item_features = await feature_store.get_features(item_id, [
"category", "price", "avg_rating", "review_count"
])
# 피처 벡터 구성
feature_vector = np.array([
user_features.get("purchase_count_30d") or 0,
user_features.get("avg_order_value_90d") or 0,
user_features.get("days_since_active") or 0,
item_features.get("price") or 0,
item_features.get("avg_rating") or 0,
])
return float(model.predict([feature_vector])[0])
마무리
피처 스토어의 핵심 가치는 피처 재사용이다. 추천, 사기 탐지, 이탈 예측이 모두 "사용자 30일 구매 횟수"를 쓴다면 한 번만 계산하고 공유하면 된다. Training-serving skew는 피처 정의를 코드로 버전 관리하고 학습·서빙 모두 동일 로직을 사용해야 해결된다.