AI Store에서 AI코딩으로 만들어진 앱을 만나보세요!
지금 바로 방문하기

MLOps Zoomcamp 2025: Week 3 - Prefect를 활용한 워크플로우 오케스트레이션

카테고리

프로그래밍/소프트웨어 개발

서브카테고리

인공지능

대상자

  • 데이터 과학자, ML 엔지니어, DevOps 엔지니어
  • 난이도: 중급 이상 (Python 및 ML 파이프라인 경험 필요)

핵심 요약

  • Prefect의 핵심 특징:

- Python-first 접근: 기존 Python 코드를 최소한의 수정으로 파이프라인화 가능

- Dynamic DAGs: 런타임에 DAG 생성 가능

- 분산 처리: S3, GCS 등 클라우드 스토리지와 호환

  • 주요 구성 요소:

- Prefect API (Orion): 스케줄링, 상태 추적, 실행 내역 관리

- Prefect UI: 시각화 대시보드, 성능 지표, 로그 제공

- Agents: 작업자로 흐름 실행 및 결과 보고

  • ML 파이프라인 예제:

- @task@flow 데코레이터 사용

- CronSchedule으로 매일 자동 실행 가능

섹션별 세부 요약

1. 워크플로우 오케스트레이션 개요

  • ML 파이프라인 주요 단계:

- 데이터 수집, 특징 공학, 모델 학습, 배포, 재학습, 모니터링

- 오케스트레이션 도구가 복잡한 의존성, 오류 처리, 실행 추적을 관리

  • Prefect의 장점:

- Python 코드 기반, 실시간 모니터링, 클라우드 확장성

2. Prefect 아키텍처

  • 주요 구성 요소:

- Prefect API (Orion): 흐름 스케줄링, 상태 관리, 실행 내역 저장

- Prefect UI: 흐름/작업 시각화, 성능 지표, 관리 제어

- Storage: 로컬/클라우드 스토리지, Git, Docker 이미지 지원

- Agents: 스케줄된 흐름 실행 및 결과 보고

3. Task 정의 및 실행

  • Task 특징:

- @task 데코레이터: 이름, 재시도, 캐시, 결과 저장 설정 가능

- 상태: Pending, Running, Completed, Failed, Retrying, Cancelled

  • 코드 예시:

```python

@task(retries=3, retry_delay_seconds=30)

def extract_data(url: str) -> pd.DataFrame:

return pd.read_csv(url)

```

4. Flow 정의 및 실행

  • Flow 기능:

- @flow 데코레이터: 이름, 설명, 버전, 태스크 러너 설정

- Task Runner:

- SequentialTaskRunner: 순차 실행 (디버깅 용이)

- ConcurrentTaskRunner: 병렬 실행 (독립 작업)

- DaskTaskRunner: 분산 처리 (대규모 데이터)

  • 코드 예시:

```python

@flow(task_runner=ConcurrentTaskRunner(), retries=2)

def process_data(date: str = None):

data = extract_data(f"data/data-{date}.csv")

processed = transform_data(data)

load_data(processed)

```

5. Deployment 및 스케줄링

  • Deployment 구성 요소:

- 이름: 배포 식별자

- 스케줄: CronSchedule, IntervalSchedule, RRuleSchedule 지원

- 인프라: 로컬 프로세스, 컨테이너, 클라우드 기반 실행

  • 코드 예시:

```python

deployment = Deployment.build_from_flow(

flow=process_data,

name="daily-data-processing",

schedule=CronSchedule("0 0 *"), # 매일 자정 실행

infrastructure=Process()

)

```

6. 결과 저장 및 모델 평가

  • 결과 저장:

- persist_result=True로 결과 저장

- JSONSerializer, S3Bucket 등 다양한 직렬화/저장 방식 지원

  • 모델 평가 예시:

```python

@task(persist_result=True)

def evaluate_model(model, df):

rmse = mean_squared_error(y_val, y_pred, squared=False)

return {"rmse": rmse}

```

7. ML 파이프라인 예제 (택시 요금 예측)

  • 단계:
  1. 데이터 다운로드: get_data 함수로 훈련/검증 데이터 로드
  2. 특징 전처리: prepare_features로 결측치 처리 및 범주형 특징 변환
  3. 모델 학습: LinearRegression 사용
  4. 모델 평가: RMSE 계산 및 로깅
  • 코드 예시:

```python

@task(retries=3)

def get_data(date):

train_data, val_data = pd.read_parquet(train_url), pd.read_parquet(val_url)

return train_data, val_data

```

결론

  • 실무 적용 팁:

- 복잡한 의존성을 가진 ML 파이프라인은 PrefectDynamic DAGs 기능 활용

- 자동화된 배포를 위해 CronScheduleDeployment 사용

- 모델 평가 메트릭mean_squared_error 등 정확한 지표로 기록

- 클라우드 기반 저장소(S3, GCS)를 사용해 결과를 확장 가능하게 관리