MLOps Zoomcamp 2025: Week 3 - Prefect를 활용한 워크플로우 오케스트레이션
카테고리
프로그래밍/소프트웨어 개발
서브카테고리
인공지능
대상자
- 데이터 과학자, ML 엔지니어, DevOps 엔지니어
- 난이도: 중급 이상 (Python 및 ML 파이프라인 경험 필요)
핵심 요약
- Prefect의 핵심 특징:
- Python-first 접근: 기존 Python 코드를 최소한의 수정으로 파이프라인화 가능
- Dynamic DAGs: 런타임에 DAG 생성 가능
- 분산 처리: S3, GCS 등 클라우드 스토리지와 호환
- 주요 구성 요소:
- Prefect API (Orion): 스케줄링, 상태 추적, 실행 내역 관리
- Prefect UI: 시각화 대시보드, 성능 지표, 로그 제공
- Agents: 작업자로 흐름 실행 및 결과 보고
- ML 파이프라인 예제:
- @task
및 @flow
데코레이터 사용
- CronSchedule
으로 매일 자동 실행 가능
섹션별 세부 요약
1. 워크플로우 오케스트레이션 개요
- ML 파이프라인 주요 단계:
- 데이터 수집, 특징 공학, 모델 학습, 배포, 재학습, 모니터링
- 오케스트레이션 도구가 복잡한 의존성, 오류 처리, 실행 추적을 관리
- Prefect의 장점:
- Python 코드 기반, 실시간 모니터링, 클라우드 확장성
2. Prefect 아키텍처
- 주요 구성 요소:
- Prefect API (Orion): 흐름 스케줄링, 상태 관리, 실행 내역 저장
- Prefect UI: 흐름/작업 시각화, 성능 지표, 관리 제어
- Storage: 로컬/클라우드 스토리지, Git, Docker 이미지 지원
- Agents: 스케줄된 흐름 실행 및 결과 보고
3. Task 정의 및 실행
- Task 특징:
- @task 데코레이터: 이름, 재시도, 캐시, 결과 저장 설정 가능
- 상태: Pending
, Running
, Completed
, Failed
, Retrying
, Cancelled
- 코드 예시:
```python
@task(retries=3, retry_delay_seconds=30)
def extract_data(url: str) -> pd.DataFrame:
return pd.read_csv(url)
```
4. Flow 정의 및 실행
- Flow 기능:
- @flow 데코레이터: 이름, 설명, 버전, 태스크 러너 설정
- Task Runner:
- SequentialTaskRunner
: 순차 실행 (디버깅 용이)
- ConcurrentTaskRunner
: 병렬 실행 (독립 작업)
- DaskTaskRunner
: 분산 처리 (대규모 데이터)
- 코드 예시:
```python
@flow(task_runner=ConcurrentTaskRunner(), retries=2)
def process_data(date: str = None):
data = extract_data(f"data/data-{date}.csv")
processed = transform_data(data)
load_data(processed)
```
5. Deployment 및 스케줄링
- Deployment 구성 요소:
- 이름: 배포 식별자
- 스케줄: CronSchedule
, IntervalSchedule
, RRuleSchedule
지원
- 인프라: 로컬 프로세스, 컨테이너, 클라우드 기반 실행
- 코드 예시:
```python
deployment = Deployment.build_from_flow(
flow=process_data,
name="daily-data-processing",
schedule=CronSchedule("0 0 *"), # 매일 자정 실행
infrastructure=Process()
)
```
6. 결과 저장 및 모델 평가
- 결과 저장:
- persist_result=True
로 결과 저장
- JSONSerializer
, S3Bucket
등 다양한 직렬화/저장 방식 지원
- 모델 평가 예시:
```python
@task(persist_result=True)
def evaluate_model(model, df):
rmse = mean_squared_error(y_val, y_pred, squared=False)
return {"rmse": rmse}
```
7. ML 파이프라인 예제 (택시 요금 예측)
- 단계:
- 데이터 다운로드:
get_data
함수로 훈련/검증 데이터 로드 - 특징 전처리:
prepare_features
로 결측치 처리 및 범주형 특징 변환 - 모델 학습:
LinearRegression
사용 - 모델 평가: RMSE 계산 및 로깅
- 코드 예시:
```python
@task(retries=3)
def get_data(date):
train_data, val_data = pd.read_parquet(train_url), pd.read_parquet(val_url)
return train_data, val_data
```
결론
- 실무 적용 팁:
- 복잡한 의존성을 가진 ML 파이프라인은 Prefect
의 Dynamic DAGs 기능 활용
- 자동화된 배포를 위해 CronSchedule
과 Deployment
사용
- 모델 평가 메트릭은 mean_squared_error
등 정확한 지표로 기록
- 클라우드 기반 저장소(S3, GCS)를 사용해 결과를 확장 가능하게 관리