관리 메뉴

개발자의 스터디 노트

Airflow 스케줄링 본문

파이썬/Airflow

Airflow 스케줄링

박개발씨 2023. 6. 11. 15:17

Apache Airflow 기반의 데이터 파이프라인 도서의 챕터3 예제를 에러 없이 동작시키기 위하여

로컬에 fastapi를 확용해서 간단한 api를 작성 하였습니다.

https://github.com/pws0601/airflow_study/tree/main/restapi/fastapi/events

 

GitHub - pws0601/airflow_study: airflow 스터디 저장소 입니다.

airflow 스터디 저장소 입니다. Contribute to pws0601/airflow_study development by creating an account on GitHub.

github.com

깃 허브의 코드를 올려두었습니다.

도커 컨테이너로 배포해도 좋고 로컬에 설치해도 좋으나 docker-compose를 이용하여 airflow환경을 구성한 만큼 네트워크를 맞춰줘야 제대로 테스트가 됩니다.

간단하게 실제 DAG가 동작하는 worker 컨테이너 안에 배포하면 별도의 환경구성 없이 테스트가 가능합니다.

install-airflow-worker-1 컨테이너 안에 /opt/airflow/fastapi 경로에 만들어 테스트 하였습니다.

 

DAG 전체 코드

from datetime import datetime
from pathlib import Path

import pandas as pd
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="01_unscheduled", 
    start_date=datetime(2023, 6, 10), #DAG의 시작 날짜를 정의
    schedule_interval=None #스케쥴 되지 않는 DAG로 지정
)

fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /opt/airflow/data/events && "
        "curl -o /opt/airflow/data/events.json http://localhost:8000/event/"
    ),
    dag=dag,
)


def _calculate_stats(input_path, output_path):
    """Calculates event statistics."""

    Path(output_path).parent.mkdir(exist_ok=True)

    events = pd.read_json(input_path)
    stats = events.groupby(["date", "user"]).size().reset_index()

    stats.to_csv(output_path, index=False)


calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    op_kwargs={"input_path": "/opt/airflow/data/events.json", "output_path": "/opt/airflow/data/stats.csv"},
    dag=dag,
)

fetch_events >> calculate_stats

초기 DAG는 시작 날짜는 정의 되어 있으나 스케쥴을 적용하지 않았습니다.

 

 

 

DAG 스케줄링 

 

1. 스케줄 간격 정의하기

 - 2023-06-24일에 시작해서 매일 자정에 스케줄 실행 예제입니다.

dag = DAG(
    dag_id="01_unscheduled", 
    start_date=datetime(2023, 6, 10), #DAG의 시작 날짜를 정의
    schedule_interval="@daily" #매일 자정에 실행
)

 

2. 종료시간 정의하기

 - 2023-06-01일에 시작하여 2023-06-30일에 종료되며 매일 자정에 스케쥴 실행 예제 입니다.

dag = DAG(
    dag_id="01_unscheduled", 
    start_date=datetime(year=2023, month=6, day=1), #DAG의 시작 날짜를 정의
    end_data=datetime(year=2023, month=6, day=30), #DAG의 종료 날짜를 정의
    schedule_interval="@daily" #매일 자정에 실행
)

 

3. Cron 기반의 스케줄 간격 설정하기

 - Cron 형식으로 스케줄 간격을 설정할 수 있습니다.

dag = DAG(
    dag_id="01_unscheduled", 
    start_date=datetime(year=2023, month=6, day=1), #DAG의 시작 날짜를 정의
    end_data=datetime(year=2023, month=6, day=30), #DAG의 종료 날짜를 정의
    schedule_interval="0 * * * *" #매시간 정시에 실행
    #schedule_interval="0 0 * * *" #매일 자정에 실행
    #schedule_interval="0 0 * * 0" # 매주 일요일 자정에 실행
    #schedule_interval="0 0 1 * *" #매월 1일 자정
    #schedule_interval="45 23 * * SAT" #매주 토요일 23시 45분
    #schedule_interval="0 0 * * MON, WED, FRI" #매주 월, 화, 금요일 자정에 실행
    #schedule_interval="0 0 * * MON-FRI" #매주 월요일부터 금요일 자정에 실행
    #schedule_interval="0 0,12 * * *" #매일 자정 및 오후 12시에 실행
)

 

 

4. 빈도 기반의 스케줄 설정하기

 - Airflow는 상대적인 시간 간격으로 스케줄 간격을 정의할 수 있도록 지원합니다.

 dag = DAG(
    dag_id="01_unscheduled", 
    start_date=datetime(year=2023, month=6, day=1), #DAG의 시작 날짜를 정의
    end_date=dt.datetime(year=2023, month=6, day=30), #DAG의 끝 날짜를 정의
    #timedelta는 빈도 기반 스케줄을 사용할 수 있는 기능을 제공합니다.
    schedule_interval=dt.timedelta(days=3) 
)

 

 

* 자주 사용되는 스케줄 간격에 대한 Airflow 프리셋

프리셋 이름 의미
@once 1회만 실행하도록 스케줄
@hourly 매시간 변경 시 1회 실행
@daily 매월 자정에 1회 실행
@weekly 매주 일요일 자정에 1회 실행
@monthly 매월 1일 자정에 1회 실행
@yearly 매년 1월 1일 자정에 1회 실행

 

 

 

데이터 증분 처리 하기

 

1. Jinja 템플릿을 활용한 증분 데이터 가져오기


# Jinja 템플릿 양식으로 실행 시작 시간 및 다음 실행 시작 시간을 지정할 수 있습니다.
fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events.json "
        "http://events_api:5000/events?"
        #Jinja 템플릿으로 형식화된 execution_date 삽입
        "start_date={{execution_date.strftime('%Y-%m-%d')}}&"
        #next_execution_date로 다음 실행 간격의 날짜를 정의
        "end_date={{next_execution_date.strftime('%Y-%m-%d')}}"
    ),
    dag=dag,
)
# 축약어를 사용
fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events.json "
        "http://events_api:5000/events?"
        "start_date={{ds}}&" # {{execution_date.strftime('%Y-%m-%d')}} 의 축약
        "end_date={{next_ds}}" # {{next_execution_date.strftime('%Y-%m-%d')}} 의 축약
    ),
    dag=dag,
)

 

2. 데이터 파티셔닝

 - 생성되는 데이터파일 이름을 실행되는 날짜를 활용하여 파티셔닝을 할수 있습니다.

 - 파티셔닝을 하지 않는다면 실행될때마다 새로운 데이터가 덮어써지게 됩니다.

fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events/{{ds}}.json "
        "http://events_api:5000/events?"
        "start_date={{ds}}&"
        "end_date={{next_ds}}"
    ),
    dag=dag,
)

 - 파티셔닝된 데이터를 반영한 이벤트 통계 작업

def _calculate_stats(**context):
    """Calculates event statistics."""
    input_path = context["templates_dict"]["input_path"]
    output_path = context["templates_dict"]["output_path"]

    events = pd.read_json(input_path)
    stats = events.groupby(["date", "user"]).size().reset_index()

    Path(output_path).parent.mkdir(exist_ok=True)
    stats.to_csv(output_path, index=False)

# Jinja 템플릿을 통해 파티셔닝된 데이터를 인자로 넣어줍니다.
calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    templates_dict={
        "input_path": "/opt/airflow/data/events/{{ds}}.json",
        "output_path": "/opt/airflow/data/stats/{{ds}}.csv",
    },
    dag=dag,
)

 

 

백필

 

* 백필(backfilling)이란?

 - 과거 데이터 세트를 로드하거나 분석하기 위해 DAG의 과거 시점을 지정해 실행

 

1. 과거 시점의 작업 실행하기

 - 기본적으로 Airflow 의 DAG는 지정된 시작 날짜로 부터 과거 시점을 모두 실행하게 설정 되어 있습니다. 이 동작을 DAG의 catchup 매개변수에 의해 제어 됩니다. 과거 시점의 태스크 실행을 피하기 위해서는 catchup을 비활성(catchup=false)화 해야 합니다.

dag = DAG(
    dag_id="09_no_catchup",
    schedule_interval="@daily",
    start_date=dt.datetime(year=2019, month=1, day=1),
    end_date=dt.datetime(year=2019, month=1, day=5),
    catchup=False,
)

 

 

* 원자성

 - DAG는 각각의 태스크가 개별 태스크로 분리되어 서로 영향을 주지 않게 작성되어야 합니다.

def _calculate_stats(**context):
    """Calculates event statistics."""
    input_path = context["templates_dict"]["input_path"]
    output_path = context["templates_dict"]["output_path"]

    events = pd.read_json(input_path)
    stats = events.groupby(["date", "user"]).size().reset_index()

    Path(output_path).parent.mkdir(exist_ok=True)
    stats.to_csv(output_path, index=False)

    _email_stats(stats, email="user@example.com")


def _email_stats(stats, email):
    """Send an email..."""
    print(f"Sending stats to {email}...")


calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    templates_dict={
        "input_path": "/data/events/{{ds}}.json",
        "output_path": "/data/stats/{{ds}}.csv",
    },
    dag=dag,
)

 - 메일을 전송하는 함수가 통계 함수 안에 있어 _email_stats 함수가 실패하더라도 출력 파일이 저장되어 있기때문에 성공한것처럼 보여 원자성이 깨지게 됩니다.

 

def _calculate_stats(**context):
    """Calculates event statistics."""
    input_path = context["templates_dict"]["input_path"]
    output_path = context["templates_dict"]["output_path"]

    events = pd.read_json(input_path)
    stats = events.groupby(["date", "user"]).size().reset_index()

    Path(output_path).parent.mkdir(exist_ok=True)
    stats.to_csv(output_path, index=False)


calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    templates_dict={
        "input_path": "/data/events/{{ds}}.json",
        "output_path": "/data/stats/{{ds}}.csv",
    },
    dag=dag,
)


def email_stats(stats, email):
    """Send an email..."""
    print(f"Sending stats to {email}...")


def _send_stats(email, **context):
    stats = pd.read_csv(context["templates_dict"]["stats_path"])
    email_stats(stats, email=email)


send_stats = PythonOperator(
    task_id="send_stats",
    python_callable=_send_stats,
    op_kwargs={"email": "user@example.com"},
    templates_dict={"stats_path": "/data/stats/{{ds}}.csv"},
    dag=dag,
)

 -  send_stats PythonOperator로 분리하여 별도로 구성하면 원자성을 유지할 수 있습니다.

 

 

*멱등성

 - 동일한 입력으로 동일한 태스크를 여러 번 호출해도 결과에 효력이 없어야 합니다. 즉, 입력 변경 없이 태스크를 다시 실행해도 전체 결과가 변경되지 않아야 합니다.

fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events/{{ds}}.json "
        "http://events_api:5000/events?"
        "start_date={{ds}}&"
        "end_date={{next_ds}}"
    ),
    dag=dag,
)

 - 입력 날짜에 맞춰 템플릿 파일 이름을 설정하여 분할하면 동일한 입력에 동일한 데이터를 덮어쓰게 되므로 멱등성을 유지할 수 있습니다.