Pythian Blog: Technical Track

Airflow Scheduler: the very basics

Apache Airflow is a great tool for scheduling jobs. It has a nice UI out of the box. It allows you to create a directed acyclic graph (DAG) of tasks and their dependencies. You can easily look at how the jobs are currently doing and how they have performed in the past. If a job fails, you can configure retries or manually kick the job easily through Airflow CLI or using the Airflow UI. Overall, it is a great tool to run your pipeline. However, if you are just getting started with Airflow, the scheduler may be fairly confusing. Let's start at the beginning and make things very simple. Our goal is to schedule a DAG that runs every day at 2:00:00 UTC, starting from today (you already have yesterday's data). A DAG, amongst other params, is instantiated with the following params: schedule_interval, start_date and end_date. Let's say we created a DAG called... Foo (I know, so original!) on 2018-06-07 with the configuration below:
dag_params = {
  ...
  start_date: datetime(2018, 6, 6),
  end_date: None, # Explicitly makes the DAG open-ended, run forever
  ...
 }
 
 with DAG('Foo', default_args=dag_params, schedule_interval='0 2 * * *') as dag:
  ...
  # a bunch of tasks
  ...
The scheduler periodically checks the DAG folder and registers the new DAG. The scheduler sets new DAGs in the paused state at creation by default, unless a developer has changed the dags_are_paused_at_creation in airflow.cfg. BEFORE unpausing the DAG, there are several things we need to understand about how the scheduler determines if a DAG should be run.
The first run
Let's pretend that we unpause the DAG. What will the scheduler do in regards to the Foo DAG? It will attempt to find previous runs. However, since this is a new DAG, there won't be any previous runs. The scheduler periodically goes through all of the DAGs and checks if the DAG needs to be run. In the context of our example DAG, the scheduler attempts to find a previous run for our DAG in its database. However, it won't find any because it is our first run. In our case, the scheduler will try to calculate the date for the next run based on the start date the DAG was instantiated with:
def following_schedule(self, dttm):
  ...
  cron = croniter(self._schedule_interval, dttm)
  following = timezone.make_aware(cron.get_next(datetime), self.timezone)
  ...
 
 next_run_date = following_schedule(start_date) # start_date=datetime(2018, 6, 6). For our DAG this will yield datetime(2018, 6, 6, 2)
The period_end is calculated after the next_run_date is calculated, . period_end is calculated as:
period_end = following_schedule(next_run_date) # period_end=datetime(2018, 6, 7, 2)
A DAG run for our DAG will be created if the following conditions are met:
  1. next_run_date <= timezone.utcnow() # next_run_date is execution_date
  2. period_end <= timezone.utcnow()
The implications of the conditions are that if you were to unpause the Foo DAG at 2018-06-07 at 1:00:00 UTC, the DAG will not be scheduled because condition 2 would not be satisfied. However, if you were to unpause at 2018-06-07 at 14:00:00, the DAG will be triggered.
DAGRun
A DAGRun is an object in airflow that represents an execution/instance of a specific DAG. Amongst other fields, it contains the execution_date, start_date and end_date. For our first DAG run, the scheduler will create a DAG run object with the following properties:
DAGRun(
  ...
  execution_date=next_run_date, # NOTE THIS HERE!
  start_date=timezone.utcnow(),
  ...
 )
If we were to unpause the Foo DAG at 2018-06-07T14:00:00 the scheduler will schedule the first DAGRun with the following params:
DAGRun(
  ...
  execution_date=datetime(2018, 6, 6, 2),
  start_date=datetime(2018, 6, 7, 2, 0, 1, 45), # the start_date will be slightly after 2018-06-07T14:00:00 because the scheduler periodically checks if a DAGRun should run
  ...
 )
Key Takeaway: The execution_date of a DAGRun is not when the DAG starts. The general rule of thumb is: the execution_date is one cron iteration prior to when the DAG Run is supposed to be scheduled to run. For example, if a job is supposed to run every hour, the execution_date of the DAGRun created at approximately 2 PM will be 1 PM.
Subsequent DAGRuns
The scheduler will be able to find a previous run for all subsequent DAGRuns. Therefore the execution_date for the new DAGRuns will simply be:
next_dag_run = following_schedule(last_scheduled_run)
 
The two conditions will still have to be met. From here onwards, the Foo DAG will run every day at 2 AM until with the previous day's 2 AM as the execution date forever!
Rationale
Some of you may be wondering why it is that when we do a job run today it has the previous cron iteration's date? I believe that this behaviour helps address a common paradigm in data ingestion. Consider the following scenario: you have a folder where you are accumulating logfiles real-time and you want to perform some batch ETL once all of the logfiles for that day arrive. If the DAG runs today, you can't process today's data because all of the data hasn't come in yet. You can only process yesterday's data because you know for sure that yesterday's data has come. If the execution_date was set to the day that the DAG runs, you will constantly have to subtract a day.  
Wrapping it up
For the purposes of explaining the most basic DAG concepts, I omitted a lot of other powerful features that Airflow has such as backfills, end_dates, task-specific start_dates, one-time-only runs and so on. I strongly recommend that anyone who wants to use airflow take some time to read the create_dag_run function in jobs.py from Airflow's GitHub repo. Choose the appropriate branch you want to read from, based on the airflow version you have. It is a great starting point into understanding how the scheduler and the rest of Airflow works.

No Comments Yet

Let us know what you think

Subscribe by email