관리 메뉴

개발자의 스터디 노트

첫번째 DAG 작성 본문

파이썬/Airflow

첫번째 DAG 작성

박개발씨 2023. 6. 11. 02:19

전체 코드

import json
import pathlib

import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

## 실행할 DAG 정보를 정의한다.
dag = DAG(
    dag_id="download_rocket_launches",
    description="Download rocket pictures of recently launched rockets.",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily",
)

# BashOperator
download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'",  # noqa: E501
    dag=dag,
)

# 파이썬 함수 
# json 파일 내의 모든 이미지 파일을 다운로드 받는다.
def _get_pictures():
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")


# PythonOperator
get_pictures = PythonOperator(
    task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

# BashOperator
notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

# 태스크 실행 순서 정의
download_launches >> get_pictures >> notify

위 코드를 보면 DAG의 기본적인 구조를 확인 할 수 있다.

 

위 코드를 dags 경로에 배포하고 실행하면

적상적으로 실행된 것을 확인할 수 있다.

DAG를 14일 전부터 매일 실행하게 설정되어 있으므로, 화면상의 실행시간인 오전 02를 기준으로 15번의 실행 결과를 확인 할 수 있다.

 

태스크와 오퍼레이터의 차이점.

오퍼레이터는 단일 작업을 수행할 수 있는 기능을 제공합니다.

태스크는 작업의 올바른 실행을 보장하기 위한 오퍼레이터의 래퍼 또는 매니저로 생각해 볼 수 있습니다.

사용자는 오퍼레이터를 활용해 수행할 작업에 집중할 수 있으며, Airflow는 태스크를 통해 작업을 올바르게 실행 할 수 있습니다.