관리 메뉴

개발자의 스터디 노트

Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기 본문

파이썬/Airflow

Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기

박개발씨 2023. 6. 12. 01:15

1. 태스크 콘텍스트와 Jinja 템플릿 작업

- wikipageviews에서 BashOperator를 이용하여 위키 데이터를 다운로드 하는 오퍼레이터 입니다.

get_data = BashOperator(
    task_id="get_data",
    bash_command=(
        "curl -o /tmp/wikipageviews.gz "
        "https://dumps.wikimedia.org/other/pageviews/"
        "{{ execution_date.year }}/"
        "{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
        "pageviews-{{ execution_date.year }}"
        "{{ '{:02}'.format(execution_date.month) }}"
        "{{ '{:02}'.format(execution_date.day) }}-"
        "{{ '{:02}'.format(execution_date.hour) }}0000.gz"
    ),
    dag=dag,
)

  - 위키 데이터를 압축하기위한 URL형식은 다양한 날짜 및 시간 구성요소로 구성됩니다. 이를 활용하여 데이터 파티셔닝을 할수 있고 증분작업에 활용할 수 있습니다.

 

2. 모든 콘텍스트 출력하기

import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_4_03",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
)


def _print_context(**kwargs):
    print(kwargs)


print_context = PythonOperator(
    task_id="print_context", 
    python_callable=_print_context, 
    dag=dag
)

print_context

 - DAG가 실행되면 출력 로그에서 모든 콘텍스트 변수가 출력되었음을 확인 할 수 있다.

 

- 모든 테스크 콘텍스트 변수

설명 예시
conf Airflow 구성에 대해 접근할 수 있습니다. airflow.configuration.
AirflowConfigParser object
dag 현재 DAG 개체 DAG object
dag_run 현재 DagRun 개체 DagRun object
ds %Y-%m-%d 형식의 execution_date "2023-06-10"
ds_nodash %Y%m%d 혁식의 execution_date "20230610"
excution_date 태스크 스케줄 간격의 시작 날짜/시간 pendulum.datetime.DateTime object
Inlets task.inlets의 약어, 데이터 계보에 대한 입력
데이터 소스를 추적하는 기능
[]
macros airflow.macros 모듈 macro module
next_ds %Y-%m-%d형식의 다음 스케줄 간격(= 현재 스케줄 간격의 끝)의 execution_date "2023-06-10"
next_ds_nodash %Y%m%d형식의 다음 스케줄 간격(= 현재 스케줄 간격의 끝)의 execution_date "20230610"
next_execution_date 태스크의 다음 스케줄 간격의 시작 datetime(=현재 스케줄 간격의 끝) pendulum.datetime.DateTime object
outlets task.outlets의 약어, 데이터 계보lineage에 대한 출력 데이터 소스를 추적하는 기능 []
params 태스크 콘텍스트에 대한 사용자 제공 변수 {}
prev_ds %Y-%m-%d 형식의 이전 스케줄 간격의 execution_date "2023-06-10"
prev_ds_nodash %Y%m%d 형식의 이전 스케줄 간격의 execution_date "20230610"
prev_execution_date 대스크 이전 스케줄 간격의 시작 datetime pendulum.datetime.DateTime object
prev_execution_date_success 동일한 태스크의 마지막으로 성공적으로 완료된 실행의 시작 datetime (과거에만 해당) pendulum.datetime.DateTime object
prev_start_date_success 동일한 태스크의 마지막으로 성공적으로 시작된 날짜와 시간 (과거에만 해당) pendulum.datetime.DateTime object
run_id DagRun의 run_id (일반적으로 접두사 + datetime으로 구성된 키) "manual__2019-0101T00:00:00+00:00"
task 현재 오퍼레이터 PythonOperator object
task_instance 현재 TaskInstance 객체 TaskInstance object
task_intance_key_str 현재 TaskInstance의 고유 식별자
({dag_id}__{task_id}__{ds_nodash})
"dag_id_task_id_20190101"
templates_dict 태스크 콘텍스트에 대한 사용자 제공 변수 {}
test_mode Airflow가 테스트 모드에서 실행 중인지 여부(구성속성) False
ti task_instance와 동일한 현재 TaskInstance 객체 TaskInstance object
tomorrow_ds ds(실행 시간)에서 1을 더함. "2023-06-10"
tomorrow_ds_nodash ds_nodash에서 1을 더함. "20230610"
ts ISO8601 포멧에 따른 execution_date "2019-0101T00:00:00+00:00"
ts_nodash %Y%m%dT%H%M%S
형식의 execution_date
"20190101T000000"
ts_nodash_with_tz 시간정보가 있는 ts_nodash "20190101T000000+0000"
var Airflow 변수를 처리하기위한 헬퍼 개체
Helplers object
{}
yesterday_ds ds(실행시간) 1일을 뺌. "2023-06-10"
yesterday_ds_nodash ds_nodash 1을 뺌. "20230610"

 

3. PythonOperator 템플릿

def _get_data(execution_date): # 함수의 인자로 콘텍스트 변수명을 통해 값을 전달
    year, month, day, hour, *_ = execution_date.timetuple()
    url = (
        "https://dumps.wikimedia.org/other/pageviews/"
        f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    output_path = "/tmp/wikipageviews.gz"
    request.urlretrieve(url, output_path)


get_data = PythonOperator(
	task_id="get_data", 
    python_callable=_get_data, # python 함수 호출
    dag=dag
)

 - PythonOperator 의 python_callable 인수에 콜러블(함수를 객체로 만들어주는)을 제공합니다. 어떤 함수라도 PythonOperator는 호출 가능하도록 수행합니다. 다른 오퍼레이터상 문자열이 아니라 함수이기 때문에 함수 내의 코드를 자동으로 템플릿화할 수는 없습니다.

 

 * 스케줄 주기의 시작 및 종료 날짜 출력하기

import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="listing_4_08",
    start_date=airflow.utils.dates.days_ago(3),
    schedule_interval="@daily",
)


def _print_context(**context):
    start = context["execution_date"]
    end = context["next_execution_date"]
    print(f"Start: {start}, end: {end}")


# Prints e.g.:
# Start: 2019-07-13T14:00:00+00:00, end: 2019-07-13T15:00:00+00:00


print_context = PythonOperator(
    task_id="print_context", python_callable=_print_context, dag=dag
)

 - **context 인수를 사용하여 콘텍스트로 부터 콘텍스트 변수를 받아 사용할 수 있습니다.

 - execution_date와 next_execution_date를 콘텍스트로부터 받아 시작 및 종료 날짜를 출력할 수 있습니다.

 

* PythonOperator에 변수 제공

def _get_data(output_path, **_):
    url = (
        "https://dumps.wikimedia.org/other/pageviews/"
        f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    request.urlretrieve(url, output_path)


get_data = PythonOperator(
    task_id="get_data",
    python_callable=_get_data,
    op_args=["/tmp/wikipageviews.gz"], 
    },
    dag=dag,
)

# op_args를 사용하여 콜러블 함수에 추가 변수를 제공합니다.
# 즉 _get_data("/tmp/wikipageviews.gz") 함수를 호출한 것과 동일한 결과를 얻을수 있습니다.

 

*PythonOperator 콜러블 커스텀  kwargs 제공하기

def _get_data(year, month, day, hour, output_path, **_):
    url = (
        "https://dumps.wikimedia.org/other/pageviews/"
        f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    request.urlretrieve(url, output_path)


get_data = PythonOperator(
    task_id="get_data",
    python_callable=_get_data,
    op_kwargs={
        "year": "{{ execution_date.year }}",
        "month": "{{ execution_date.month }}",
        "day": "{{ execution_date.day }}",
        "hour": "{{ execution_date.hour }}",
        "output_path": "/tmp/wikipageviews.gz",
    },
    dag=dag,
)
# op_kwargs (사용자 정의키워드 인수)를 통해 콜러블 함수에 전달할 수 있습니다.

 

다른시스템과 연결하기

 

1. PostgreSql과 연결하기.

https://github.com/pws0601/airflow_study/tree/main/PostgreSql

 

GitHub - pws0601/airflow_study: airflow 스터디 저장소 입니다.

airflow 스터디 저장소 입니다. Contribute to pws0601/airflow_study development by creating an account on GitHub.

github.com

PostgreSql을 Docker 로 배포 하는 방법을 공유하였습니다.

 

2. CLI를 사용하여 Airflow에 자격 증명 저장하기

airflow connections add \
	--conn-type postgres \
    --conn-host 접속IP \
    --conn-login postgres \
    --conn-password airflow \
    my_postgres

 

3. 관리자 화면에서 자격증명 저장하기

 - 상단의 Admin > Connections 메뉴에서 설정할 수 있습니다.

 

4. 전체 코드

from urllib import request

import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
import ssl

dag = DAG(
    dag_id="listing_4_20",
    start_date=airflow.utils.dates.days_ago(1),
    schedule_interval="@hourly",
    template_searchpath="/tmp",
    max_active_runs=1,
)


def _get_data(year, month, day, hour, output_path):
    url = (
        "https://dumps.wikimedia.org/other/pageviews/"
        f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
    )
    print("url : ", url)
    # ssl 인증서 관련 에러가 발생하여 임시로 인증서를 무시하도록 설정하였습니다.
    # 권장하는 방법은 아닙니다.
    ssl._create_default_https_context = ssl._create_unverified_context
    request.urlretrieve(url, output_path)


get_data = PythonOperator(
    task_id="get_data",
    python_callable=_get_data,
    op_kwargs={
        "year": "{{ execution_date.year }}",
        "month": "{{ execution_date.month }}",
        "day": "{{ execution_date.day }}",
        "hour": "{{ execution_date.hour-1 }}",
        "output_path": "/opt/airflow/data//wikipageviews.gz",
    },
    dag=dag,
)


extract_gz = BashOperator(
    task_id="extract_gz", bash_command="gunzip --force /opt/airflow/data/wikipageviews.gz", dag=dag
)


def _fetch_pageviews(pagenames, execution_date):
    result = dict.fromkeys(pagenames, 0)
    with open("/opt/airflow/data/wikipageviews", "r") as f:
        for line in f:
            domain_code, page_title, view_counts, _ = line.split(" ")
            if domain_code == "en" and page_title in pagenames:
                result[page_title] = view_counts

    # PostgresOperator 에 의해서 실행될 sql문 생성.
    # /dags 하위에 파일이 있어야 인식할수 있습니다.
    with open("/opt/airflow/dags/postgres_query.sql", "w") as f:
        for pagename, pageviewcount in result.items():
            f.write(
                "INSERT INTO pageview_counts VALUES ("
                f"'{pagename}', {pageviewcount}, '{execution_date}'"
                ");\n"
            )


fetch_pageviews = PythonOperator(
    task_id="fetch_pageviews",
    python_callable=_fetch_pageviews,
    op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
    dag=dag,
)

write_to_postgres = PostgresOperator(
    task_id="write_to_postgres",
    postgres_conn_id="my_postgres",
    sql="postgres_query.sql", # PostgresOperator 에 의해서 실행될 sql문 /dags 하위에 파일이 있어야 인식할수 있습니다.
    dag=dag,
)

get_data >> extract_gz >> fetch_pageviews >> write_to_postgres

'파이썬 > Airflow' 카테고리의 다른 글

워크플로 트리거  (0) 2023.06.18
태스크간 의존성 정의  (0) 2023.06.12
Airflow 스케줄링  (0) 2023.06.11
첫번째 DAG 작성  (0) 2023.06.11
Airflow가 뭔가요?  (0) 2023.06.11