개발자의 스터디 노트
워크플로 트리거 본문
1. FileSensor 오퍼레이터 사용하기.
- 특정 파일 또는 파일 세트의 존재를 확인하는 데 사용되는 센서 오퍼레이터 입니다. 이것은 일반적으로 워크 플로우가 파일이나 데이터가 있는 위치에 도달할 때까지 기다리는 시나리오에 유용합니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.sensors import FileSensor
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
}
with DAG(
'file_sensor_example_with_tasks',
default_args=default_args,
description='An example DAG with a FileSensor and tasks',
schedule_interval='@daily',
start_date=days_ago(2),
tags=['example'],
) as dag:
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/opt/airflow/my_file.txt',
poke_interval=10, # check every 10 seconds
timeout=300, # timeout after 5 minutes
)
process_file = BashOperator(
task_id='process_file',
bash_command='echo "Processing file: /opt/airflow/my_file.txt"',
)
wait_for_file >> process_file # set the task dependencies
* wait_for_file 태스크가 true가 되어야(정해진 파일 경로에 파일이 있을경우)만 다음 태스크(process_file)이 실행됩니다.
* FileSensor는 10초 간격으로 체크하고 300초(5분)간 파일이 나타나지 않을 경우 작업은 실패로 처리 됩니다.
2. PythonSensor 오퍼레이터 사용하기
- PythonSensor는 사용자 정의 Python함수가 특정 조건을 충족할때까지 곅속 확인하는 작업입니다. 이 작업은 종종 외부 시스템에서 데이터가 준비되었는지 또는 특정 상태가 도달했는지 확인하는 데 사용됩니다.
from pathlib import Path
import airflow.utils.dates
from airflow import DAG
from airflow.sensors.python import PythonSensor
dag = DAG(
dag_id="listing_6_02",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
description="A batch workflow for ingesting supermarket promotions data.",
default_args={"depends_on_past": True},
)
def _wait_for_supermarket(supermarket_id_):
supermarket_path = Path("/data/" + supermarket_id_)
data_files = supermarket_path.glob("data-*.csv")
success_file = supermarket_path / "_SUCCESS"
return data_files and success_file.exists()
wait_for_supermarket_1 = PythonSensor(
task_id="wait_for_supermarket_1",
python_callable=_wait_for_supermarket,
op_kwargs={"supermarket_id": "supermarket1"},
dag=dag,
)
process_file = BashOperator(
task_id='process_file',
bash_command='echo "Processing file: ./data/*"',
)
wait_for_supermarket_1 >> process_file
* 매일 오후 4시에 DAG가 실행되는 스케쥴입니다.
* PythonSensor는 60초 간격으로 실행 됩니다.(기본값)
* timeout 매개변수가 설정되지 않았으므로 기본값(최대 7일)만큼 기다립니다. 그 시간이 경과하면 작업은 실패가 됩니다.
FileSensor, PythonSensor 의 옵션
poke_interval : 기본값 10초 간격.
timeout : 기본값 7일 7일동안 반복.
3. 최대 동시 태스크 수 설정하기
- DAG에서 센서처리를 할때 timeout 옵션을 설정하지 않는다면 해당 태스크는 기본값이 7일이 지난 후에 시패 메세지를 전송하게 됩니다. 그사이에 지속적으로 DAG작업은 실행되고 해당 날짜에 대한 센서가 시작되며 결과적으로 더 많은 태스크가 실행되기 시작합니다. 이런 문제를 해결하기 위해 태스크 수를 제어하는 방법이 있습니다.
dag = DAG(
dag_id="listing_6_03",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="@daily",
concurrency=50, # 동시에 50개의 태스크 실행을 허용
)
* concurrency 옵션을 통해 최대 동시 태스크 수를 설정할 수 있습니다.
4. 센서 데드록
- 데이터가 절대 준비 되지 않을 경우, 센서 태스크는 무한히 기다리면서 해당 DAG의 다른 태스크가 실행되지 못하게 막는 상태에 빠질수 있습니다. 이러한 상태를 센서 데드록이라고 합니다.
이를 방지하기 위해 센서는 timeout, poke_interval, reschedule 옵션을 제공합니다.
* reschedule : reschedule 모드는 포크 동작을 실행할 때믄 슬롯을 차지하며, 대기 시작 동안은 슬롯을 차지 하지 않습니다.
5. TriggerDagRunOperator 사용하기.
- TriggerDagRunOperator는 특수한 오퍼레이터로 실행 중인 DAG에서 다른 DAG를 실행하도록 트리거 하는데 사용됩니다.
- 이 오퍼레이터는 다른 DAG를 비동기적으로 실행합니다.
- 이 오퍼레이터를 사용하면 여러 DAG들이 서로의 완료를 기다리지 않고 독립적으로 실행될 수 있습니다.
from pathlib import Path
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.python import PythonSensor
dag1 = DAG(
dag_id="listing_6_04_dag01",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
)
dag2 = DAG(
dag_id="listing_6_04_dag02",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)
def _wait_for_supermarket(supermarket_id_):
supermarket_path = Path("/data/" + supermarket_id_)
data_files = supermarket_path.glob("data-*.csv")
success_file = supermarket_path / "_SUCCESS"
return data_files and success_file.exists()
for supermarket_id in range(1, 5):
wait = PythonSensor(
task_id=f"wait_for_supermarket_{supermarket_id}",
python_callable=_wait_for_supermarket,
op_kwargs={"supermarket_id_": f"supermarket{supermarket_id}"},
dag=dag1,
)
copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{supermarket_id}", dag=dag1)
process = DummyOperator(task_id=f"process_supermarket_{supermarket_id}", dag=dag1)
trigger_create_metrics_dag = TriggerDagRunOperator(
task_id=f"trigger_create_metrics_dag_supermarket_{supermarket_id}",
trigger_dag_id="listing_6_04_dag02",
dag=dag1,
)
wait >> copy >> process >> trigger_create_metrics_dag
compute_differences = DummyOperator(task_id="compute_differences", dag=dag2)
update_dashboard = DummyOperator(task_id="update_dashboard", dag=dag2)
notify_new_data = DummyOperator(task_id="notify_new_data", dag=dag2)
compute_differences >> update_dashboard
* TriggerDagRunOperator 오퍼레이터를 통해 listing_6_04dag02 DAG를 실행하는 코드 입니다.
6. ExternalTaskSensor 사용하기
* TriggerDagRunOperator로 백필 작업
- TriggerDagRunOperator를 포함한 DAG에서 태스크를 삭제하면 이전에 트리거 된 해당 DAG 실행을 지우는 대신에 새 DAG 실행이 트리거 됩니다.
- TriggerDagRunOperator 는 태스크 상태를 포크하지 않고 개별적으로 실행하기에 DAG간 의존성을 관리하는 방법은 제공하지 않습니다.
import datetime
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor
dag1 = DAG(
dag_id="figure_6_20_dag_1",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 16 * * *",
)
dag2 = DAG(
dag_id="figure_6_20_dag_2",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval="0 18 * * *",
)
DummyOperator(task_id="copy_to_raw", dag=dag1) >> DummyOperator(
task_id="process_supermarket", dag=dag1
)
wait = ExternalTaskSensor(
task_id="wait_for_process_supermarket",
external_dag_id="figure_6_20_dag_1",
external_task_id="process_supermarket",
execution_delta=datetime.timedelta(hours=6),
dag=dag2,
)
report = DummyOperator(task_id="report", dag=dag2)
wait >> report
- ExternalTaskSensor가 report 태스크를 실행하기 전 DAG가 모두 완료되었는지를 확인하는 프록시 역할을 합니다.
- DAG2가 DAG1의 태스크 상태를 확인할때 몇가지 단점이 존재합니다. ExternalTaskSensor는 자신과 정확히 동일한 실행 날짜를 가진 태스크에 대한 성공만 확인하는 것입니다. 만약 두 DAG의 스케줄 간격이 다르다면 ExternalTaskSensor는 해당하는 태스크를 찾을 수 없습니다.이럴 경우 스케줄 간격을 검색할 수 있도록 오프셋을 설정할 수 있습니다.
- execution_delta : 다른 태스크를 검색하기 위해 오프셋 을 설정
- timedelta 오브젝트는 예상과 다르게 동작한다는 것이 중요합니다. 설정된 timedelta는 excution_date에서 뺍니다. 즉, 양수의 timedelta 값은 시간을 거슬로 올라간다는 것을 의미합니다.
7. CLI/REST API를 이용해 워크플로 시작하기
- airflow CLI를 사용하여 DAG 트리거 하기
airflow dags triger dag1
- 추가 구성으로 DAG트리거 하기
airflow dags trigger -c '{"supermarket_id": 1}' dag1
airflow dags trigger --conf '{"supermarket_id": 1}' dag1
- airflow REST API를 사용하여 DAG 트리거 하기
# URL is /api/v1
curl \
-u admin:admin \
-x POST \
"http://localhost:8000/api/v1/dags/print_dag_run_conf/dagRuns" \
-H "Content-Type: application/json" \
-d '{"conf: {"supermarket": 1}}'
- 평문으로 사용자 이름과 패스워드를 보내는 것은 바람직하지 않습니다. 다른 인증방법으로 API를 호출하여야 합니다.
- 추가 구성 설정이 제공되지 않은 경우에도 엔드포인트에는 데이터가 필요합니다.
8. 마치며...
- 센서는 특정 조건이 참인지 여부를 지속적으로 확인(polling)하는 특수 유형 오퍼레이터 입니다.
- Airflow는 다양한 시스템 및 사용 요건에 맞는 센서를 제공합니다. 사용자 지정 조건은 PythonSensor로 만들 수 있습니다.
- TriggerDagRunOperator는 다른 DAG를 트리거 할 수 있습니다. ExternalTaskSensor는 다른 DAG의 상태를 확인할 수 있습니다.
- REST API와 CLI를 사용하여 Airflow 외부에서 DAG를 트리거 할 수 있습니다.
'파이썬 > Airflow' 카테고리의 다른 글
태스크간 의존성 정의 (0) | 2023.06.12 |
---|---|
Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기 (0) | 2023.06.12 |
Airflow 스케줄링 (0) | 2023.06.11 |
첫번째 DAG 작성 (0) | 2023.06.11 |
Airflow가 뭔가요? (0) | 2023.06.11 |