개발자의 스터디 노트
Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기 본문
1. 태스크 콘텍스트와 Jinja 템플릿 작업
- wikipageviews에서 BashOperator를 이용하여 위키 데이터를 다운로드 하는 오퍼레이터 입니다.
get_data = BashOperator(
task_id="get_data",
bash_command=(
"curl -o /tmp/wikipageviews.gz "
"https://dumps.wikimedia.org/other/pageviews/"
"{{ execution_date.year }}/"
"{{ execution_date.year }}-{{ '{:02}'.format(execution_date.month) }}/"
"pageviews-{{ execution_date.year }}"
"{{ '{:02}'.format(execution_date.month) }}"
"{{ '{:02}'.format(execution_date.day) }}-"
"{{ '{:02}'.format(execution_date.hour) }}0000.gz"
),
dag=dag,
)
- 위키 데이터를 압축하기위한 URL형식은 다양한 날짜 및 시간 구성요소로 구성됩니다. 이를 활용하여 데이터 파티셔닝을 할수 있고 증분작업에 활용할 수 있습니다.
2. 모든 콘텍스트 출력하기
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="listing_4_03",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)
def _print_context(**kwargs):
print(kwargs)
print_context = PythonOperator(
task_id="print_context",
python_callable=_print_context,
dag=dag
)
print_context
- DAG가 실행되면 출력 로그에서 모든 콘텍스트 변수가 출력되었음을 확인 할 수 있다.
- 모든 테스크 콘텍스트 변수
키 | 설명 | 예시 |
conf | Airflow 구성에 대해 접근할 수 있습니다. | airflow.configuration. AirflowConfigParser object |
dag | 현재 DAG 개체 | DAG object |
dag_run | 현재 DagRun 개체 | DagRun object |
ds | %Y-%m-%d 형식의 execution_date | "2023-06-10" |
ds_nodash | %Y%m%d 혁식의 execution_date | "20230610" |
excution_date | 태스크 스케줄 간격의 시작 날짜/시간 | pendulum.datetime.DateTime object |
Inlets | task.inlets의 약어, 데이터 계보에 대한 입력 데이터 소스를 추적하는 기능 |
[] |
macros | airflow.macros 모듈 | macro module |
next_ds | %Y-%m-%d형식의 다음 스케줄 간격(= 현재 스케줄 간격의 끝)의 execution_date | "2023-06-10" |
next_ds_nodash | %Y%m%d형식의 다음 스케줄 간격(= 현재 스케줄 간격의 끝)의 execution_date | "20230610" |
next_execution_date | 태스크의 다음 스케줄 간격의 시작 datetime(=현재 스케줄 간격의 끝) | pendulum.datetime.DateTime object |
outlets | task.outlets의 약어, 데이터 계보lineage에 대한 출력 데이터 소스를 추적하는 기능 | [] |
params | 태스크 콘텍스트에 대한 사용자 제공 변수 | {} |
prev_ds | %Y-%m-%d 형식의 이전 스케줄 간격의 execution_date | "2023-06-10" |
prev_ds_nodash | %Y%m%d 형식의 이전 스케줄 간격의 execution_date | "20230610" |
prev_execution_date | 대스크 이전 스케줄 간격의 시작 datetime | pendulum.datetime.DateTime object |
prev_execution_date_success | 동일한 태스크의 마지막으로 성공적으로 완료된 실행의 시작 datetime (과거에만 해당) | pendulum.datetime.DateTime object |
prev_start_date_success | 동일한 태스크의 마지막으로 성공적으로 시작된 날짜와 시간 (과거에만 해당) | pendulum.datetime.DateTime object |
run_id | DagRun의 run_id (일반적으로 접두사 + datetime으로 구성된 키) | "manual__2019-0101T00:00:00+00:00" |
task | 현재 오퍼레이터 | PythonOperator object |
task_instance | 현재 TaskInstance 객체 | TaskInstance object |
task_intance_key_str | 현재 TaskInstance의 고유 식별자 ({dag_id}__{task_id}__{ds_nodash}) |
"dag_id_task_id_20190101" |
templates_dict | 태스크 콘텍스트에 대한 사용자 제공 변수 | {} |
test_mode | Airflow가 테스트 모드에서 실행 중인지 여부(구성속성) | False |
ti | task_instance와 동일한 현재 TaskInstance 객체 | TaskInstance object |
tomorrow_ds | ds(실행 시간)에서 1을 더함. | "2023-06-10" |
tomorrow_ds_nodash | ds_nodash에서 1을 더함. | "20230610" |
ts | ISO8601 포멧에 따른 execution_date | "2019-0101T00:00:00+00:00" |
ts_nodash | %Y%m%dT%H%M%S 형식의 execution_date |
"20190101T000000" |
ts_nodash_with_tz | 시간정보가 있는 ts_nodash | "20190101T000000+0000" |
var | Airflow 변수를 처리하기위한 헬퍼 개체 Helplers object |
{} |
yesterday_ds | ds(실행시간) 1일을 뺌. | "2023-06-10" |
yesterday_ds_nodash | ds_nodash 1을 뺌. | "20230610" |
3. PythonOperator 템플릿
def _get_data(execution_date): # 함수의 인자로 콘텍스트 변수명을 통해 값을 전달
year, month, day, hour, *_ = execution_date.timetuple()
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
output_path = "/tmp/wikipageviews.gz"
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data, # python 함수 호출
dag=dag
)
- PythonOperator 의 python_callable 인수에 콜러블(함수를 객체로 만들어주는)을 제공합니다. 어떤 함수라도 PythonOperator는 호출 가능하도록 수행합니다. 다른 오퍼레이터상 문자열이 아니라 함수이기 때문에 함수 내의 코드를 자동으로 템플릿화할 수는 없습니다.
* 스케줄 주기의 시작 및 종료 날짜 출력하기
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="listing_4_08",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
)
def _print_context(**context):
start = context["execution_date"]
end = context["next_execution_date"]
print(f"Start: {start}, end: {end}")
# Prints e.g.:
# Start: 2019-07-13T14:00:00+00:00, end: 2019-07-13T15:00:00+00:00
print_context = PythonOperator(
task_id="print_context", python_callable=_print_context, dag=dag
)
- **context 인수를 사용하여 콘텍스트로 부터 콘텍스트 변수를 받아 사용할 수 있습니다.
- execution_date와 next_execution_date를 콘텍스트로부터 받아 시작 및 종료 날짜를 출력할 수 있습니다.
* PythonOperator에 변수 제공
def _get_data(output_path, **_):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
op_args=["/tmp/wikipageviews.gz"],
},
dag=dag,
)
# op_args를 사용하여 콜러블 함수에 추가 변수를 제공합니다.
# 즉 _get_data("/tmp/wikipageviews.gz") 함수를 호출한 것과 동일한 결과를 얻을수 있습니다.
*PythonOperator 콜러블 커스텀 kwargs 제공하기
def _get_data(year, month, day, hour, output_path, **_):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
op_kwargs={
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour }}",
"output_path": "/tmp/wikipageviews.gz",
},
dag=dag,
)
# op_kwargs (사용자 정의키워드 인수)를 통해 콜러블 함수에 전달할 수 있습니다.
다른시스템과 연결하기
1. PostgreSql과 연결하기.
https://github.com/pws0601/airflow_study/tree/main/PostgreSql
PostgreSql을 Docker 로 배포 하는 방법을 공유하였습니다.
2. CLI를 사용하여 Airflow에 자격 증명 저장하기
airflow connections add \
--conn-type postgres \
--conn-host 접속IP \
--conn-login postgres \
--conn-password airflow \
my_postgres
3. 관리자 화면에서 자격증명 저장하기
- 상단의 Admin > Connections 메뉴에서 설정할 수 있습니다.
4. 전체 코드
from urllib import request
import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
import ssl
dag = DAG(
dag_id="listing_4_20",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval="@hourly",
template_searchpath="/tmp",
max_active_runs=1,
)
def _get_data(year, month, day, hour, output_path):
url = (
"https://dumps.wikimedia.org/other/pageviews/"
f"{year}/{year}-{month:0>2}/pageviews-{year}{month:0>2}{day:0>2}-{hour:0>2}0000.gz"
)
print("url : ", url)
# ssl 인증서 관련 에러가 발생하여 임시로 인증서를 무시하도록 설정하였습니다.
# 권장하는 방법은 아닙니다.
ssl._create_default_https_context = ssl._create_unverified_context
request.urlretrieve(url, output_path)
get_data = PythonOperator(
task_id="get_data",
python_callable=_get_data,
op_kwargs={
"year": "{{ execution_date.year }}",
"month": "{{ execution_date.month }}",
"day": "{{ execution_date.day }}",
"hour": "{{ execution_date.hour-1 }}",
"output_path": "/opt/airflow/data//wikipageviews.gz",
},
dag=dag,
)
extract_gz = BashOperator(
task_id="extract_gz", bash_command="gunzip --force /opt/airflow/data/wikipageviews.gz", dag=dag
)
def _fetch_pageviews(pagenames, execution_date):
result = dict.fromkeys(pagenames, 0)
with open("/opt/airflow/data/wikipageviews", "r") as f:
for line in f:
domain_code, page_title, view_counts, _ = line.split(" ")
if domain_code == "en" and page_title in pagenames:
result[page_title] = view_counts
# PostgresOperator 에 의해서 실행될 sql문 생성.
# /dags 하위에 파일이 있어야 인식할수 있습니다.
with open("/opt/airflow/dags/postgres_query.sql", "w") as f:
for pagename, pageviewcount in result.items():
f.write(
"INSERT INTO pageview_counts VALUES ("
f"'{pagename}', {pageviewcount}, '{execution_date}'"
");\n"
)
fetch_pageviews = PythonOperator(
task_id="fetch_pageviews",
python_callable=_fetch_pageviews,
op_kwargs={"pagenames": {"Google", "Amazon", "Apple", "Microsoft", "Facebook"}},
dag=dag,
)
write_to_postgres = PostgresOperator(
task_id="write_to_postgres",
postgres_conn_id="my_postgres",
sql="postgres_query.sql", # PostgresOperator 에 의해서 실행될 sql문 /dags 하위에 파일이 있어야 인식할수 있습니다.
dag=dag,
)
get_data >> extract_gz >> fetch_pageviews >> write_to_postgres
'파이썬 > Airflow' 카테고리의 다른 글
워크플로 트리거 (0) | 2023.06.18 |
---|---|
태스크간 의존성 정의 (0) | 2023.06.12 |
Airflow 스케줄링 (0) | 2023.06.11 |
첫번째 DAG 작성 (0) | 2023.06.11 |
Airflow가 뭔가요? (0) | 2023.06.11 |