Learning Dagster from Airflow#

In this tutorial, we'll help you make the switch from Airflow to Dagster. Here, we review an Airflow DAG and show how the same functionality and be achieved in Dagster.


Comparing an Airflow Dag to Dagster#

In this guide we will rewrite an Airflow DAG as a Dagster Job. Starting with a basic Airflow DAG:

from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
    "tutorial",
    default_args={
        "retries": 1,
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        bash_command="sleep 5",
        retries=3,
    )

    templated_command = dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

In order to rewrite this DAG in Dagster we will break it down into three parts

  1. Define the computations: "the ops" - in Airflow, the operators
  2. Define the graph: the job - in Airflow, the DAG
  3. Define the schedule - In Airflow, the schedule (how simple!)

A Job is made up of a Graph of Ops. If you've used the Airflow taskflow api this will feel familiar, With ops you will be focused on writing a graph with python functions as the nodes with the data dependencies between those as edges.

Step 1: Defining the Ops#

In Dagster, the minimum unit of computation is an op. This directly corresponds to an operator in Airflow. Here, we map the Airflow operators t1, t2, and t3 to their respective Dagster op's.

@op
def print_date(context) -> datetime:
    ds = datetime.now()
    context.log.info(ds)
    return ds


@op(retry_policy=RetryPolicy(max_retries=3), ins={"start": In(Nothing)})
def sleep():
    time.sleep(5)


@op
def templated(context, ds: datetime):
    for _i in range(5):
        context.log.info(ds)
        context.log.info(ds - timedelta(days=7))

this yields us the following graph of computations

Screenshot of the dagster UI, showing the newly created graph of tutorial Ops
Op Level Retries#

In the tutorial DAG, the t2 operator allowed for 3 retries. To configure the same behavior in Dagster, you can use op level retry policies

Step 2: Defining the Job#

In Dagster, the computations defined in ops are composed in jobs, which define the sequence and dependency structure of the computations you want to execute. This directly corresponds to a DAG in Airflow. Here, we compose the ops print_date, sleep and templated to match the dependency structure defined by the Airflow operators t1, t2, and t3.

@job(tags={"dagster/max_retries": 1, "dag_name": "example"})
def tutorial_job():
    ds = print_date()
    sleep(ds)
    templated(ds)
Job Level Retries#

Job level retries are managed by the run launcher you will need to enable support for them in your dagster.yaml, once you do you can define the retry count on the job.

Step 3: Defining the schedule#

In Dagster, schedules can be defined for jobs, which determine the cadence at which a job is triggered to be executed. Below we define a schedule that will run the tutorial_job daily:

schedule = ScheduleDefinition(job=tutorial_job, cron_schedule="@daily")

Step 4: Running locally#

In order to run our newly defined Dagster job we'll need to define a Dagster repository. A Dagster repository is a logical grouping of jobs and schedules that provides an entrypoint for viewing and executing them.

@repository
def rewrite_repo():
    return [tutorial_job, schedule]

We can now load this file with dagit, the interactive web-ui for dagster

dagit -f <your_dagster_file>.py

Completed Code Example#

import time
from datetime import datetime, timedelta

from dagster import (
    In,
    Nothing,
    RetryPolicy,
    ScheduleDefinition,
    job,
    op,
    repository,
    schedule,
)


@op
def print_date(context) -> datetime:
    ds = datetime.now()
    context.log.info(ds)
    return ds


@op(retry_policy=RetryPolicy(max_retries=3), ins={"start": In(Nothing)})
def sleep():
    time.sleep(5)


@op
def templated(context, ds: datetime):
    for _i in range(5):
        context.log.info(ds)
        context.log.info(ds - timedelta(days=7))


@job(tags={"dagster/max_retries": 1, "dag_name": "example"})
def tutorial_job():
    ds = print_date()
    sleep(ds)
    templated(ds)


schedule = ScheduleDefinition(job=tutorial_job, cron_schedule="@daily")


@repository
def rewrite_repo():
    return [tutorial_job, schedule]