Replacement ShortCircuitOperator for Airflow

The ShortCircuitOperator in Airflow behaves in an unusual way, in that it modifies the state of future tasks. This gives rise to two major problems:

  1. Clearing a skipped task can result in that task being run, even though it should be skipped
  2. Depends on past does not work reliably for downstream tasks

To demonstrate these, we will use this example DAG. It’s a contrived example, but it makes demonstration simple. The ShortCircuitOperator should result in tasks B and C running only on even hours of the day.

import airflow
from airflow.operators.python_operator import PythonOperator, ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import AirflowSkipException
from airflow.models import DAG

from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'start_date': datetime(2017,1,1,1,0),
    'end_date': datetime(2017,1,1,4,0)
}

dag = DAG(
    dag_id = 'simple_skip_stock',
    schedule_interval = timedelta(hours=1),
    default_args = args,
)

def even_hour(**kwargs):
    return kwargs['execution_date'].hour % 2 == 0

task_A = ShortCircuitOperator(
    task_id = 'task_A',
    python_callable = even_hour,
    provide_context = True,
    dag = dag
)

task_B = DummyOperator(task_id = 'task_B', dag = dag, depends_on_past = True)
task_C = DummyOperator(task_id = 'task_C', dag = dag)

task_B.set_upstream(task_A)
task_C.set_upstream(task_B)

We’d expect our DAG to give the following output, and for task B to have been run in sequential order from 1am → 4am

Now let’s check the two bugs we mentioned

1) Clearing skipped tasks

Once the DAG has finished running, we clear the 3AM instance of task B via the user interface. Previously this task had been skipped when Task A had run. We’d expect that this task is subsequently skipped.

The scheduler examines the task instance and confirms the dependencies are met

  • task B @ 2AM (previous instance) : success
  • task A @ 3AM (upstream instance) : success

It has no knowledge of the result of the even_hour method as this information has not been stored anywhere. As such, it decides to run task B and that task completes.

In practice it’s quite likely that a skipped task will get cleared in this way. Clearing airflow tasks with the user interface is a very common operation, and normally it should be safe to re-run tasks if they have been written correctly.

2) Depends on past

To demonstrate this issue more easily, we make a small modification to the even_hour method.

import time

def even_hour(**kwargs):
    if kwargs['execution_date'].hour < 3:
        time.sleep(60)
    return kwargs['execution_date'].hour % 2 == 0

This means that the 1AM and 2AM tasks will take longer to finish. When we run this DAG we now get to the following state.

Because depends_on_past only checked the previous task state, the 4AM instance of task B sees that the 3AM instance had been marked as skipped. The scheduler decided the task was ready to run.

In practice, this mis-ordering will happen, especially when catchup is enabled. When using an executor with multiple workers, you have no guarantees as to which tasks will complete first.

Additionally, the same problem applies if the 1AM and 2AM tasks had failed – the 4AM task will still run.

An alternative approach

Rather than having the ShortCircuitOperator modify the state for future tasks, we instead store the results of the python function using XCOM. Downstream tasks can then check this information, and decide whether they should skip or not.

We can implement all of this without any changes to the airflow codebase.

The SkipOperator replaces the ShortCircuitOperator. It returns the result of the python function, which causes it to be stored in XCOM. It doesn’t change any other task states.

class SkipOperator(PythonOperator):
    def execute(self, context):
        result = super(SkipOperator, self).execute(context)
        # Sanity check the output type to avoid any mistakes on usage
        if not isinstance(result, bool):
            raise Exception("Python function must return a boolean result")
        return result

The next part is slightly more complicated. We need all downstream operators to check for upstream SkipOperator tasks when they execute. To do this, we create a mixin class which can be applied to existing operators.

class SkippableMixin:
    """ Apply this to an operator to add support for the alternative skip functionality
        in conjunction with SkipOperator

        This task will be skipped if any parent SkipOperator task has a python method
        which returns a status of False.
    """
    def execute(self, context):
        # Find the task object for the current TaskInstance
        dag = context['dag']
        ti = context['ti']
        this_task = dag.get_task(ti.task_id)

        # Find the parent tasks from the dag
        all_upstream = this_task.get_flat_relatives(upstream=True)

        # See if any of the upstream tasks are of type 'SkipOperator'
        # If they are, then check the result of their python method
        parent_results = []
        for task in all_upstream:
            if isinstance(task, SkipOperator):
                status = bool(ti.xcom_pull(task_ids=task.task_id))
                self.log.debug("Task needs to check parent %s: status %s" %
                   (task.task_id, status)
                )
                parent_results.append(status)

        if False in parent_results:
            raise AirflowSkipException("Skipping task due to parent results")

        return super(SkippableMixin, self).execute(context)

We then create skippable versions of existing operators using inheritance. We need to make a skippable version of each operator we want to use.

class SkippableDummyOperator(SkippableMixin, DummyOperator): pass

Our new version of the DAG looks like this

args = {
    'owner': 'airflow',
    'start_date': datetime(2017,1,1,1,0),
    'end_date': datetime(2017,1,1,4,0)
}

dag = DAG(
    dag_id = 'simple_skip_revised',
    schedule_interval = timedelta(hours=1),
    default_args = args,
)

def even_hour(**kwargs):
    return kwargs['execution_date'].hour % 2 == 0

task_A = SkipOperator(
    task_id = 'task_A',
    python_callable = even_hour,
    provide_context = True,
    dag = dag
)

task_B = SkippableDummyOperator(task_id = 'task_B', dag = dag, depends_on_past = True)
task_C = SkippableDummyOperator(task_id = 'task_C', dag = dag)

task_B.set_upstream(task_A)
task_C.set_upstream(task_B)

Our two bugs are now be resolved:

  • When a task is cleared, it will check the state of upstream tasks and behave correctly
  • As we no longer update the state of downstream tasks, depends_on_past works correctly again

There are some other edge cases, such as if Task C has multiple upstream tasks and trigger rules. I’ve left these out for this post for brevity, but those should work correctly also. If you find any cases not covered, please let me know.

Permanent Solutions

The above recipe is very much a quick-fix solution to the ShortCircuitOperator bugs. Fixing the bugs properly in airflow would require more significant changes to the scheduler and dependency logic.

For now, this should provide a workaround for anyone who comes across this issue.

Feedback: Find me as kevcampb on https://gitter.im/apache/incubator-airflow or leave a comment below

Replacement ShortCircuitOperator for Airflow