개발자의 스터디 노트
첫번째 DAG 작성 본문
전체 코드
import json
import pathlib
import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
## 실행할 DAG 정보를 정의한다.
dag = DAG(
dag_id="download_rocket_launches",
description="Download rocket pictures of recently launched rockets.",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval="@daily",
)
# BashOperator
download_launches = BashOperator(
task_id="download_launches",
bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'", # noqa: E501
dag=dag,
)
# 파이썬 함수
# json 파일 내의 모든 이미지 파일을 다운로드 받는다.
def _get_pictures():
# Ensure directory exists
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)
# Download all pictures in launches.json
with open("/tmp/launches.json") as f:
launches = json.load(f)
image_urls = [launch["image"] for launch in launches["results"]]
for image_url in image_urls:
try:
response = requests.get(image_url)
image_filename = image_url.split("/")[-1]
target_file = f"/tmp/images/{image_filename}"
with open(target_file, "wb") as f:
f.write(response.content)
print(f"Downloaded {image_url} to {target_file}")
except requests_exceptions.MissingSchema:
print(f"{image_url} appears to be an invalid URL.")
except requests_exceptions.ConnectionError:
print(f"Could not connect to {image_url}.")
# PythonOperator
get_pictures = PythonOperator(
task_id="get_pictures", python_callable=_get_pictures, dag=dag
)
# BashOperator
notify = BashOperator(
task_id="notify",
bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
dag=dag,
)
# 태스크 실행 순서 정의
download_launches >> get_pictures >> notify
위 코드를 보면 DAG의 기본적인 구조를 확인 할 수 있다.
위 코드를 dags 경로에 배포하고 실행하면
적상적으로 실행된 것을 확인할 수 있다.
DAG를 14일 전부터 매일 실행하게 설정되어 있으므로, 화면상의 실행시간인 오전 02를 기준으로 15번의 실행 결과를 확인 할 수 있다.
태스크와 오퍼레이터의 차이점.
오퍼레이터는 단일 작업을 수행할 수 있는 기능을 제공합니다.
태스크는 작업의 올바른 실행을 보장하기 위한 오퍼레이터의 래퍼 또는 매니저로 생각해 볼 수 있습니다.
사용자는 오퍼레이터를 활용해 수행할 작업에 집중할 수 있으며, Airflow는 태스크를 통해 작업을 올바르게 실행 할 수 있습니다.
'파이썬 > Airflow' 카테고리의 다른 글
Airflow 콘텍스트를 사용하여 태스크 템플릿 작업하기 (0) | 2023.06.12 |
---|---|
Airflow 스케줄링 (0) | 2023.06.11 |
Airflow가 뭔가요? (0) | 2023.06.11 |
docker compose를 이용한 airflow 설치_예제 DAG 안보이게 하기. (0) | 2023.06.11 |
docker compose를 이용한 airflow 설치 (0) | 2023.06.03 |