In this tutorial, we'll help you make the switch from Airflow to Dagster. Here, we review an Airflow DAG and show how the same functionality and be achieved in Dagster.
In this guide we will rewrite an Airflow DAG as a Dagster Job. Starting with a basic Airflow DAG:
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG("tutorial",
default_args={"retries":1,},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021,1,1),
catchup=False,
tags=["example"],)as dag:
t1 = BashOperator(
task_id="print_date",
bash_command="date",)
t2 = BashOperator(
task_id="sleep",
bash_command="sleep 5",
retries=3,)
templated_command = dedent("""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
""")
t3 = BashOperator(
task_id="templated",
bash_command=templated_command,)
t1 >>[t2, t3]
In order to rewrite this DAG in Dagster we will break it down into three parts
Define the computations: "the ops" - in Airflow, the operators
Define the graph: the job - in Airflow, the DAG
Define the schedule - In Airflow, the schedule (how simple!)
A Job is made up of a Graph of Ops. If you've used the Airflow taskflow api this will feel familiar, With ops you will be focused on writing a graph with python functions as the nodes with the data dependencies between those as edges.
In Dagster, the minimum unit of computation is an op. This directly corresponds to an operator in Airflow. Here, we map the Airflow operators t1, t2, and t3 to their respective Dagster op's.
In Dagster, the computations defined in ops are composed in jobs, which define the sequence and dependency structure of the computations you want to execute. This directly corresponds to a DAG in Airflow. Here, we compose the ops print_date, sleep and templated to match the dependency structure defined by the Airflow operators t1, t2, and t3.
Job level retries are managed by the run launcher you will need to enable support for them in your dagster.yaml, once you do you can define the retry count on the job.
In Dagster, schedules can be defined for jobs, which determine the cadence at which a job is triggered to be executed. Below we define a schedule that will run the tutorial_job daily:
In order to run our newly defined Dagster job we'll need to define a Dagster repository. A Dagster repository is a logical grouping of jobs and schedules that provides an entrypoint for viewing and executing them.