Mastering Dynamic Workflows in Airflow: A Guide to Dynamic Task Mapping

Data pipelines often involve processing multiple pieces of data in the same way. Imagine needing to run a transformation on a list of files, process records from a database query, or handle parameters generated by a previous step. A common challenge arises when the exact number of items to process isn’t known when the pipeline is defined – it’s only determined during runtime. How can you create Airflow tasks dynamically to handle this variability?

While standard Airflow requires tasks to be declared upfront, this becomes problematic when the workload is dynamic. Apache Airflow version 2.3.0 introduced a powerful feature to address this exact scenario: Dynamic Task Mapping.

The Challenge: Static Tasks vs. Dynamic Needs

Traditionally in Airflow, defining tasks is a declarative process. You specify each task (Operator) and its dependencies within your DAG file. If you needed to perform the same operation, say, multiplying a list of numbers by two, you might define separate tasks for each number, especially if those numbers come from an upstream task.

Consider this simplified scenario:

  1. A task get_values retrieves a list of numbers, e.g., [10, 20, 30].
  2. Subsequent tasks should double each of these numbers individually.

Without Dynamic Task Mapping, you’d face limitations:

  1. Declarative Nature: You typically need to define each PythonOperator instance explicitly in the DAG file. While you could use a loop if you knew the number of items beforehand (e.g., during DAG parsing), you can’t easily use a loop based on the runtime output of get_values.
  2. XCom Accessibility: Values pushed to XCom by a task (like the list from get_values) are generally only accessible within the execution context of another task instance. You cannot directly use an XCom value retrieved at runtime to define the structure (i.e., the number of tasks) of the DAG itself during parsing. Creating tasks inside other tasks is generally discouraged.

This leads to inflexible pipelines or complex workarounds. How can we create tasks dynamically based on runtime results?

Introducing Airflow Dynamic Task Mapping

Dynamic Task Mapping is Airflow’s native solution for creating multiple task instances dynamically at runtime based on the output of a preceding task or a defined set of parameters. It elegantly solves the problem of not knowing the required number of tasks beforehand.

Instead of pre-defining numerous similar tasks, you define a single task template. Airflow then uses the output from an upstream task (or a specified iterable) to “map” or “expand” this template into multiple parallel task instances at runtime, one for each item in the input.

This feature became available starting with Airflow 2.3.0.

How Dynamic Task Mapping Works: partial and expand

The core of Dynamic Task Mapping lies in two methods applied to an Operator: .partial() and .expand().

  • .expand(**kwargs**): This method specifies which parameters of your task template should be dynamically populated. You provide it with an iterable (like a list or the output of another task). Airflow will create one mapped task instance for each element in the iterable, passing that element to the specified parameter. If you .expand() on multiple parameters, Airflow creates instances based on the Cartesian product of the inputs.

  • .partial(**kwargs**): This method defines the static or fixed parameters for the task template. Arguments like task_id, python_callable, or any other parameter that should remain the same for all dynamically generated task instances are defined here.

Typically, you get the output of an upstream task using the .output attribute (e.g., upstream_task.output) and pass this directly to the .expand() method.

A Practical Example Revisited

Let’s revisit the example of doubling a list of numbers dynamically generated by get_values_task. Using Dynamic Task Mapping, the code becomes significantly simpler and truly dynamic:

# imports
from airflow.operators.python import PythonOperator
from airflow.models.dag import DAG
import datetime

# Define Python functions
def get_values():
    """Returns a list of lists, as expand expects iterables for args."""
    return [[10], [20], [30]] # Note: Each element is wrapped in a list for op_args

def double_value(value):
    """Takes a single value and doubles it."""
    print(f"Processing value: {value}")
    return int(value) * 2

# Define the DAG
with DAG(
    dag_id='dynamic_doubling_example',
    start_date=datetime.datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
) as dag:

    # Task to get the dynamic list of values
    get_values_task = PythonOperator(
        task_id="get_values",
        python_callable=get_values
    )

    # Dynamically mapped task using .partial() and .expand()
    double_value_task = PythonOperator.partial(
        task_id="double_value",       # Base task_id for the mapped tasks
        python_callable=double_value, # The function each mapped task will run
    ).expand(op_args=get_values_task.output) # Expand based on the output of get_values_task
                                             # 'op_args' expects an iterable of iterables

    # Define dependencies
    get_values_task >> double_value_task

How to interpret this:

The double_value_task is defined using PythonOperator.partial(...). This sets up the template: the base task_id will be “double_value”, and the function to call is double_value.

The .expand(op_args=get_values_task.output) part tells Airflow: “Take the output from get_values_task. For each item in that output list, create a task instance based on the template defined in .partial(), and pass the item as the op_args.”

Since get_values returns [[10], [20], [30]], Airflow will create three mapped instances of the double_value task at runtime:
1. One instance calling double_value(10)
2. One instance calling double_value(20)
3. One instance calling double_value(30)

This happens automatically without needing to explicitly define three separate tasks.

Visualizing Dynamic Tasks in the Airflow UI

In the Airflow Graph View, you won’t see multiple distinct nodes for each doubled value. Instead, you’ll see the single double_value task node, often indicated with brackets [] in its name (e.g., double_value [ ]) to signify it’s a mapped task.

To see the individual instances created dynamically, click on this mapped task node. A “Mapped Tasks” tab will appear in the task details view. Here, you’ll find a list of all the dynamically created task instances, each identified by a Map Index (starting from 0). You can inspect the logs, status, and details of each individual mapped run.

Summary: Benefits of Dynamic Task Mapping

Dynamic Task Mapping is a powerful feature for creating flexible and scalable Airflow pipelines. Its key advantages include:

  • Reduced Boilerplate: Eliminates the need to manually define numerous similar tasks.
  • True Runtime Dynamism: Creates tasks based on data generated during the DAG run, not just at parsing time.
  • Inherent Parallelism: Mapped task instances can run in parallel (up to Airflow’s concurrency limits), speeding up processing.
  • Cleaner DAGs: Leads to more concise and readable DAG code.

It’s an essential tool for anyone building complex data workflows in Airflow that need to adapt to varying amounts of input data.


At Innovative Software Technology, we leverage advanced techniques like Airflow Dynamic Task Mapping to architect robust and scalable data pipelines for our clients. If you’re struggling with complex workflow automation or need expert guidance on optimizing your data processing tasks for efficiency and dynamic execution, our experienced data engineers can help. We design and implement cutting-edge data engineering solutions tailored to your specific needs, ensuring your systems handle variable workloads seamlessly and deliver timely insights. Partner with Innovative Software Technology to unlock the full potential of your data infrastructure and gain a competitive edge through efficient, automated data workflows.

Leave a Reply

Your email address will not be published. Required fields are marked *

Fill out this field
Fill out this field
Please enter a valid email address.
You need to agree with the terms to proceed