DAG(Directed Acyclic Graph)는 Airflow에서 워크플로우를 정의하는 기본 단위로, Python 코드로 작성되며 여러 개의 Task로 구성됩니다. 각 Task의 실행 순서와 종속성을 정의할 수 있으며, 주어진 일정에 따라 자동으로 실행되어 데이터 파이프라인을 효과적으로 관리할 수 있습니다.


Basic Structure

Airflow에서 DAG는 아래와 같은 기본 구조를 가집니다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

# 기본 설정
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2025, 2, 2),
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# DAG 정의
dag = DAG(
    "my_first_dag",
    default_args=default_args,
    description="My first Airflow DAG",
    schedule_interval="@daily",
    catchup=False,
)

# 실행할 함수 정의
def print_hello():
    print("Hello, Airflow!")

# Task 정의
task_1 = PythonOperator(
    task_id="print_hello",
    python_callable=print_hello,
    dag=dag,
)

task_2 = BashOperator(
    task_id="echo_hello",
    bash_command="echo 'Hello, Airflow!'",
    dag=dag,
)

task_1 >> task_2

Operators

Airflow에서는 다양한 오퍼레이터를 제공하여 DAG 내에서 여러 작업을 수행할 수 있습니다.

PythonOperator

PythonOperatorPython 함수를 실행하는 데 사용됩니다.

def my_python_function():
    print("Hello from PythonOperator")

python_task = PythonOperator(
    task_id="python_task",
    python_callable=my_python_function,
    dag=dag,
)

BashOperator

BashOperatorBash 명령어를 실행할 때 사용됩니다.

from airflow.operators.bash import BashOperator

bash_task = BashOperator(
    task_id="bash_task",
    bash_command="echo 'Hello from BashOperator'",
    dag=dag,
)

DummyOperator

DummyOperator는 실행 없이 단순히 Task 간의 의존성을 설정할 때 사용됩니다.

from airflow.operators.dummy import DummyOperator

dummy_task = DummyOperator(
    task_id="dummy_task",
    dag=dag,
)

Task Dependencies

Airflow에서는 >> 연산자를 사용하여 Task 간의 실행 순서를 정의할 수 있습니다.

python_task >> bash_task >> dummy_task

Parallel Task

여러 Task를 동시에 실행할 수도 있습니다. 이를 통해 의존성이 없는 작업들을 병렬로 처리하여 파이프라인의 효율성을 높일 수 있습니다.

python_task >> [bash_task, dummy_task] # bash_task, dummy_task 동시 실행

References