관리 메뉴

개발자의 스터디 노트

태스크간 의존성 정의 본문

파이썬/Airflow

태스크간 의존성 정의

박개발씨 2023. 6. 12. 20:58

1. 의존성 정의

 - with 구문을 사용하여 좀더 깔끔한 DAG를 작성할 수 있습니다.

import airflow

from airflow import DAG
from airflow.operators.dummy import DummyOperator


with DAG(
    dag_id="01_start",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
) as dag:
    start = DummyOperator(task_id="start")

    fetch_sales = DummyOperator(task_id="fetch_sales")
    clean_sales = DummyOperator(task_id="clean_sales")

    fetch_weather = DummyOperator(task_id="fetch_weather")
    clean_weather = DummyOperator(task_id="clean_weather")

    join_datasets = DummyOperator(task_id="join_datasets")
    train_model = DummyOperator(task_id="train_model")
    deploy_model = DummyOperator(task_id="deploy_model")

    start >> [fetch_sales, fetch_weather] #Fan-in 의존성
    fetch_sales >> clean_sales # 선형 의존성
    fetch_weather >> clean_weather # 선형 의존성
    [clean_sales, clean_weather] >> join_datasets # Fan-out 의존성
    join_datasets >> train_model >> deploy_model # 선형 의존성

    # DummyOperator 더미 오퍼레이터 : 아무것도 수행하지 않는 오퍼레이터, 
    # 아무것도 실행하지는 않으나 성공적으로 실행된것으로 간주하여 다음 단계로 진행됩니다.
    # 복잡한 작업 흐름을 구현하는데 사용할 수 있습니다.

 - 일반적인 선형 의존성 정의와 1:대인 Fan-in 의존성, 다:1 인 Fan-out 의존성 정의를 확인할 수 있습니다.

 - DummyOperator를 활용하면 복잡한 흐름을 구현할 수 있습니다.

 - 의존성 그래프

 

 

2. 브랜치 하기

 - 태스크 내에서 브랜치 하기

import airflow

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator

ERP_CHANGE_DATE = airflow.utils.dates.days_ago(1)

# 수집 태스크 내에서 조건별로 브랜치하기
def _fetch_sales(**context):
    if context["execution_date"] < ERP_CHANGE_DATE:
        _fetch_sales_old(**context)
    else:
        _fetch_sales_new(**context)


def _fetch_sales_old(**context):
    print("Fetching sales data (OLD)...")


def _fetch_sales_new(**context):
    print("Fetching sales data (NEW)...")

# 정제 태스크 내에서 조건별로 브랜치하기
def _clean_sales(**context):
    if context["execution_date"] < airflow.utils.dates.days_ago(1):
        _clean_sales_old(**context)
    else:
        _clean_sales_new(**context)


def _clean_sales_old(**context):
    print("Preprocessing sales data (OLD)...")


def _clean_sales_new(**context):
    print("Preprocessing sales data (NEW)...")


with DAG(
    dag_id="02_branch_function",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
) as dag:
    start = DummyOperator(task_id="start")

    fetch_sales = PythonOperator(task_id="fetch_sales", python_callable=_fetch_sales)
    clean_sales = PythonOperator(task_id="clean_sales", python_callable=_clean_sales)

    fetch_weather = DummyOperator(task_id="fetch_weather")
    clean_weather = DummyOperator(task_id="clean_weather")

    join_datasets = DummyOperator(task_id="join_datasets")
    train_model = DummyOperator(task_id="train_model")
    deploy_model = DummyOperator(task_id="deploy_model")

    start >> [fetch_sales, fetch_weather]
    fetch_sales >> clean_sales
    fetch_weather >> clean_weather
    [clean_sales, clean_weather] >> join_datasets
    join_datasets >> train_model >> deploy_model

 - 브랜치가 태스크 내부에 숨겨져 있어 작업이 명시적이지 않습니다.

 

 

3. DAG 내부에서 브랜치하기

import airflow

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator

ERP_CHANGE_DATE = airflow.utils.dates.days_ago(1)

# 태스크 이름을 리턴하여 명시적으로 브랜치를 나눌수 있게 만든 함수 입니다.
def _pick_erp_system(**context):
    if context["execution_date"] < ERP_CHANGE_DATE:
        return "fetch_sales_old"
    else:
        return "fetch_sales_new"


def _fetch_sales_old(**context):
    print("Fetching sales data (OLD)...")


def _fetch_sales_new(**context):
    print("Fetching sales data (NEW)...")


def _clean_sales_old(**context):
    print("Preprocessing sales data (OLD)...")


def _clean_sales_new(**context):
    print("Preprocessing sales data (NEW)...")


with DAG(
    dag_id="03_branch_dag",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
) as dag:
    start = DummyOperator(task_id="start")

    # BranchPythonOperator 을 이용하여 명시적으로 브랜치를 나눌수 있습니다.
    pick_erp_system = BranchPythonOperator(
        task_id="pick_erp_system", python_callable=_pick_erp_system
    )

    fetch_sales_old = PythonOperator(
        task_id="fetch_sales_old", python_callable=_fetch_sales_old
    )
    clean_sales_old = PythonOperator(
        task_id="clean_sales_old", python_callable=_clean_sales_old
    )

    fetch_sales_new = PythonOperator(
        task_id="fetch_sales_new", python_callable=_fetch_sales_new
    )
    clean_sales_new = PythonOperator(
        task_id="clean_sales_new", python_callable=_clean_sales_new
    )

    fetch_weather = DummyOperator(task_id="fetch_weather")
    clean_weather = DummyOperator(task_id="clean_weather")

    # Using the wrong trigger rule ("all_success") results in tasks being skipped downstream.
    # join_datasets = DummyOperator(task_id="join_datasets")
    # 업스트림의 태스크중 하나를 건너뛰더라도 계속 트리거가 진행되도록 트리거 규칙 변경
    # trigger_rule="none_failed"
    join_datasets = DummyOperator(
        task_id="join_datasets", 
        trigger_rule="none_failed"
    )
    train_model = DummyOperator(task_id="train_model")
    deploy_model = DummyOperator(task_id="deploy_model")

    start >> [pick_erp_system, fetch_weather]
    pick_erp_system >> [fetch_sales_old, fetch_sales_new]
    fetch_sales_old >> clean_sales_old
    fetch_sales_new >> clean_sales_new
    fetch_weather >> clean_weather
    [clean_sales_old, clean_sales_new, clean_weather] >> join_datasets
    join_datasets >> train_model >> deploy_model

 

- BranchPythonOperator 을 통해 명시적으로 브랜치를 나눌 수 있습니다.

 - trigger_rule="none_failed" 규칙을 변경하여 업스트림의 태스크중 하나를 건너뛰더라도 계속 트리거 진행되도록 할 수 있습니다.

 - 트리거 규칙을 변경하지 않는다면 브랜치된 업스트림의 태스크들이 모두 성공이 되지 않아 join_datasets 태스크와 모든 다운 스트림 태스크가 실행되지 않습니다.

 

 

 - 명확성을 위해 더미 조인 태스크 추가

    join_erp = DummyOperator(task_id="join_erp_branch", trigger_rule="none_failed")

    fetch_weather = DummyOperator(task_id="fetch_weather")
    clean_weather = DummyOperator(task_id="clean_weather")

    join_datasets = DummyOperator(task_id="join_datasets")
    train_model = DummyOperator(task_id="train_model")
    deploy_model = DummyOperator(task_id="deploy_model")

    start >> [pick_erp_system, fetch_weather]
    pick_erp_system >> [fetch_sales_old, fetch_sales_new]
    fetch_sales_old >> clean_sales_old
    fetch_sales_new >> clean_sales_new
    [clean_sales_old, clean_sales_new] >> join_erp
    fetch_weather >> clean_weather
    [join_erp, clean_weather] >> join_datasets
    join_datasets >> train_model >> deploy_model

 

 

5. 조건부 태스크 만들기

import airflow
import pendulum

from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator

ERP_CHANGE_DATE = airflow.utils.dates.days_ago(1)


def _pick_erp_system(**context):
    if context["execution_date"] < ERP_CHANGE_DATE:
        return "fetch_sales_old"
    else:
        return "fetch_sales_new"



def _latest_only(**context):    
    # 실행 윈도우에서 경계를 확인합니다.
    left_window = context["dag"].following_schedule(context["execution_date"])
    right_window = context["dag"].following_schedule(left_window)

    # 현재 시간이 윈도우 안에 있는지 확인합니다.
    now = pendulum.now("UTC")
    if not left_window < now <= right_window:
        raise AirflowSkipException()


with DAG(
    dag_id="06_condition_dag",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
) as dag:
    start = DummyOperator(task_id="start")

    pick_erp = BranchPythonOperator(
        task_id="pick_erp_system", python_callable=_pick_erp_system
    )

    fetch_sales_old = DummyOperator(task_id="fetch_sales_old")
    clean_sales_old = DummyOperator(task_id="clean_sales_old")

    fetch_sales_new = DummyOperator(task_id="fetch_sales_new")
    clean_sales_new = DummyOperator(task_id="clean_sales_new")

    join_erp = DummyOperator(task_id="join_erp_branch", trigger_rule="none_failed")

    fetch_weather = DummyOperator(task_id="fetch_weather")
    clean_weather = DummyOperator(task_id="clean_weather")

    join_datasets = DummyOperator(task_id="join_datasets")
    train_model = DummyOperator(task_id="train_model")

    latest_only = PythonOperator(task_id="latest_only", python_callable=_latest_only)

    deploy_model = DummyOperator(task_id="deploy_model")

    start >> [pick_erp, fetch_weather]
    pick_erp >> [fetch_sales_old, fetch_sales_new]
    fetch_sales_old >> clean_sales_old
    fetch_sales_new >> clean_sales_new
    [clean_sales_old, clean_sales_new] >> join_erp
    fetch_weather >> clean_weather
    [join_erp, clean_weather] >> join_datasets
    join_datasets >> train_model >> deploy_model
    latest_only >> deploy_model

 - _latest_only라는 조건 함수를 만들어 가장 최근에 실행한 DAG를 제외하고 모두 실행되지 않게 합니다.

 - _latest_only 함수에 의해 하나의 작업만 배포가 되고 나머지는 skip한걸 확인할 수 있습니다.

 

 - 내장 오퍼레이터 사용하기

#latest_only = PythonOperator(task_id="latest_only", python_callable=_latest_only)

# LatestOnlyOperator 오퍼레이터를 통해 같은 효과를 확인할 수 있습니다.
# LatestOnlyOperator 를 사용하면 조건부 배포를 구현하기 간단할 수 있지만
# 복잡한 경우에는 PythonOperator 기반으로 구현하는 것이 효율적입니다.
latest_only = LatestOnlyOperator(task_id="latest_only", dag=dag)

 

 

6. 트리거 규칙 정리

트리거 규칙 동작 사용사례
all_success
(default)
모든 상위 태스크가 성공적으로 완료되면 트리거 됩니다. 일반적인 워크플로에 대한 기본 트리거 규칙입니다.
all_failed 모든 상위 태스크가 실패했거나 상위 태스크의 오류로 인해 실패했을 경우 트리거 됩니다. 태스크 그룹에서 하나 이상 실패가 예상되는 상황에서 오류 처리 코드를 트리거 합니다.
all_done 결과 상태에 관계없이 모든 무보가 실행을 완료하면 트리거 됩니다. 모든 태스크가 완료되었을 때 실행할 청소 코드를 실행합니다.(예: 시스템 종료 또는 클러스터 중지).
one_failed 하나 이상의 상위 태스크가 실패하자마자 트리거되며 다른 상위 태스크의 실행 완료를 기다리지 않습니다. 알림 또는 롤백과 같은 일부 오류 처리 코드를 빠르게 트리거합니다.
one_success 한 부모가 성공하자마자 트리거되며 다른 상위 태스크의 실행 완료를 기다리지 않습니다. 하나의 결과를 사용할 수 있게 되는 즉시 다운스트림 연산/알림을 빠르게 트리거합니다.
none_failed 실패한 상위 태스크가 없지만, 태스크가 성공 또는 건너뛴 경우 트리거됩니다. DAG상 조건부 브랜치의 결합에 사용합니다.

 

 

7. 태스크 간 데이터 공유

 - XCom을 사용하여 데이터 공유하기

import uuid

import airflow

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator


def _train_model(**context):
    model_id = str(uuid.uuid4())
    # xcom_push 메서드를 사용하여 데이터 공유하기
    context["task_instance"].xcom_push(key="model_id", value=model_id)


def _deploy_model(**context):
    # xcom_pull 메서드를 사용하여 XCom을 확인하기
    model_id = context["task_instance"].xcom_pull(
        task_ids="train_model", key="model_id"
    )
    print(f"Deploying model {model_id}")


with DAG(
    dag_id="10_xcoms",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
) as dag:
    start = DummyOperator(task_id="start")

    fetch_sales = DummyOperator(task_id="fetch_sales")
    clean_sales = DummyOperator(task_id="clean_sales")

    fetch_weather = DummyOperator(task_id="fetch_weather")
    clean_weather = DummyOperator(task_id="clean_weather")

    join_datasets = DummyOperator(task_id="join_datasets")

    train_model = PythonOperator(task_id="train_model", python_callable=_train_model)

    deploy_model = PythonOperator(task_id="deploy_model", python_callable=_deploy_model)

    start >> [fetch_sales, fetch_weather]
    fetch_sales >> clean_sales
    fetch_weather >> clean_weather
    [clean_sales, clean_weather] >> join_datasets
    join_datasets >> train_model >> deploy_model

Admin > Xcom에서 확인할 수 있습니다.

# 템플릿을 통해 XCom 값을 참조 할 수 있습니다.
def _deploy_model(templates_dict, **context):
    model_id = templates_dict["model_id"]
    print(f"Deploying model {model_id}")
    
    
 deploy_model = PythonOperator(
        task_id="deploy_model",
        python_callable=_deploy_model,
        templates_dict={
            "model_id": "{{task_instance.xcom_pull(task_ids='train_model', key='model_id')}}"
        },
    )

 

* XCom 사용 시 고려사항.

  1. 사이즈 제한: XCom은 메타데이터 데이터베이스에 데이터를 저장합니다. 일반적으로 이러한 데이터베이스는 대량의 데이터를 저장하도록 설계되지 않았기 때문에, 너무 큰 데이터를 XCom으로 전달하려고 하면 성능 문제가 발생할 수 있습니다. 따라서 XCom은 작은 양의 데이터를 전달하는데 사용해야 합니다.
  2. 직렬화: XCom을 통해 전달되는 값은 직렬화되어야 합니다. 즉, 메모리에 저장된 객체를 바이트 스트림으로 변환할 수 있어야 합니다. 따라서 XCom에는 직렬화 가능한 데이터 타입만 사용해야 합니다.
  3. 암호화와 보안: XCom은 메타데이터 데이터베이스에 데이터를 저장하기 때문에, 민감한 정보는 암호화되거나 다른 방법으로 보호되어야 합니다.
  4. 캡슐화: 한 작업에서 생성된 데이터는 그 작업에서만 알아야 할 내용을 포함하고 있을 수 있습니다. 이런 데이터를 다른 작업에게 공개하면, 작업 간의 독립성이 깨질 수 있습니다. 따라서 XCom을 사용할 때는 작업 간의 캡슐화를 고려해야 합니다.
  5. 의존성 관리: XCom은 작업 간의 의존성을 늘릴 수 있습니다. 한 작업이 XCom을 통해 데이터를 전달하면, 다른 작업이 그 데이터에 의존하게 됩니다. 이러한 의존성이 복잡하게 얽히면, 코드를 이해하고 유지보수하는 것이 어려워질 수 있습니다.

따라서 XCom 신중하게 사용해야 하며, 필요한 경우에만 사용하고, 가능하면 대안을 찾는 것이 좋습니다. 예를 들어, 작업이 생성한 데이터를 다른 작업에서 사용해야 하는 경우, 데이터를 공유 파일 시스템이나 분산 스토리지에 저장하고, 위치를 XCom으로 전달하는 방법을 고려할 있습니다.

 

* 커스텀 XCom 백엔드 사용하기

Airflow 2.1.0부터 XCom에 커스텀 백엔드를 사용할 수 있게 되었습니다. 커스텀 백엔드를 사용하면, XCom 데이터를 기본 메타데이터 데이터베이스 대신 다른 위치에 저장할 수 있습니다. 이를 통해 XCom의 크기 제한과 관련된 문제를 완화할 수 있습니다.

커스텀 XCom 백엔드를 사용하는 방법은 다음과 같습니다.

  1. 먼저, XCom 백엔드를 정의하는 Python 모듈을 작성합니다. 이 모듈에는 xcom_push와 xcom_pull이라는 두 개의 함수가 있어야 합니다. xcom_push 함수는 XCom 값을 저장하고, xcom_pull 함수는 XCom 값을 불러오는 역할을 합니다.

예를 들어, 다음과 같이 XCom 데이터를 S3 저장하는 백엔드를 정의할 있습니다:

# my_xcom_backend.py

import boto3
from airflow.models.xcom import BaseXCom

class S3XComBackend(BaseXCom):
    PREFIX = "xcom_s3://"

    @staticmethod
    def serialize_value(value):
        return BaseXCom.serialize_value(value)

    @staticmethod
    def deserialize_value(result) -> Any:
        return BaseXCom.deserialize_value(result)

    @classmethod
    def xcom_push(cls, key, value):
        s3 = boto3.client('s3')
        value = cls.serialize_value(value)
        s3.put_object(Bucket='my-bucket', Key=f"{cls.PREFIX}{key}", Body=value)

    @classmethod
    def xcom_pull(cls, key):
        s3 = boto3.client('s3')
        result = s3.get_object(Bucket='my-bucket', Key=f"{cls.PREFIX}{key}")
        value = result['Body'].read()
        return cls.deserialize_value(value)

이 코드는 XCom 데이터를 S3 버킷에 저장하고 불러옵니다. my-bucket은 실제 S3 버킷 이름으로 변경해야 합니다.

 

2. 그런 다음, Airflow 구성에서 xcom_backend 옵션을 사용하여 커스텀 백엔드를 지정합니다. 옵션은 Airflow 구성의 [core] 섹션에 위치해야 합니다.

[core]
xcom_backend = my_xcom_backend.S3XComBackend

이 설정은 my_xcom_backend.py 파일의 S3XComBackend 클래스를 XCom 백엔드로 사용하도록 지시합니다.

이렇게 하면, Airflow는 XCom 데이터를 기본 메타데이터 데이터베이스 대신 S3에 저장하고 불러옵니다.

참고로, 위의 코드는 예시를 위한 것으로, 실제 환경에서는 S3 접근하는 필요한 인증 정보 등을 적절히 처리해야 합니다.

 

 

 

8. Taskflow API로 파이썬 태스크 단순화 하기

import uuid

import airflow

from airflow import DAG
from airflow.decorators import task
from airflow.operators.dummy import DummyOperator


with DAG(
    dag_id="13_taskflow_full",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
) as dag:
    start = DummyOperator(task_id="start")

    fetch_sales = DummyOperator(task_id="fetch_sales")
    clean_sales = DummyOperator(task_id="clean_sales")

    fetch_weather = DummyOperator(task_id="fetch_weather")
    clean_weather = DummyOperator(task_id="clean_weather")

    join_datasets = DummyOperator(task_id="join_datasets")

    start >> [fetch_sales, fetch_weather]
    fetch_sales >> clean_sales
    fetch_weather >> clean_weather
    [clean_sales, clean_weather] >> join_datasets

    # @task 데코레이션을 통해 함수를 태스크로 만들어
    # 태스크를 연결합니다.
    # model_id를 더이상 XCom에 공유 되지 않습니다.
    @task
    def train_model():
        model_id = str(uuid.uuid4())
        return model_id

    @task
    def deploy_model(model_id: str):
        print(f"Deploying model {model_id}")

    model_id = train_model()
    deploy_model(model_id)

    join_datasets >> model_id

 - @task 데코레이션을 통해 함수가 task가 됐고, 의존성이 연결된걸 확인 할 수 있습니다.

'파이썬 > Airflow' 카테고리의 다른 글

워크플로 트리거  (0) 2023.06.18
Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기  (0) 2023.06.12
Airflow 스케줄링  (0) 2023.06.11
첫번째 DAG 작성  (0) 2023.06.11
Airflow가 뭔가요?  (0) 2023.06.11