Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mismatched Xcom Map Index when Dynamic Mapping over TaskGroup and not all mapped tasks have run #40321

Open
1 of 2 tasks
gavinhonl opened this issue Jun 19, 2024 · 2 comments
Open
1 of 2 tasks
Labels
affected_version:2.8 Issues Reported for 2.8 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug

Comments

@gavinhonl
Copy link

gavinhonl commented Jun 19, 2024

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.3

What happened?

If a TaskGroup is dynamically mapped, the Xcom map index for downstream tasks to pull from is mismatched. This is due to the Xcom length not being the same length as the number of mapped tasks resulting in the following error:
File "/home/airflow/.local/lib/python3.11/site-packages/airflow/models/xcom.py", line 795, in __getitem__ raise IndexError(key) from None IndexError: 5

This is a simplified DAG that reproduces the problem. One mapped task is caused to reschedule instead of complete causing the shortened Xcom dict:


from airflow.decorators import task, task_group
from airflow.operators.python import get_current_context
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.base import PokeReturnValue

default_args = {
    'owner': 'dev',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 1),
    'provide_context': True,
    'retries': 0,
    'retry_delay': timedelta(seconds=30)
}

@task
def parse_csv_schedule():
    items_dict = [{'A': 1}, {'B': 2}, {'C': 3}, {'D': 4}, {'E': 5}, {'F': 6}]
    return items_dict

@task_group(group_id="process_items")
def process_items(items_dict: dict):
    @task.sensor(poke_interval=90, mode="reschedule")
    def retrieve_item(item: dict) -> PokeReturnValue:
        for letter, integer in item.items():
            if integer == 3:
                print("Number is 3 - reschedule")
                return PokeReturnValue(is_done=False)
            else:
                print("Number is not 3 - Let's go")
                return PokeReturnValue(is_done=True, xcom_value=item)
    
    @task
    def process_item(item: dict):
        context = get_current_context()
        ti = context["ti"]
        map_index = ti.map_index
        print(f"Taskflow XCOM: {item}")
        xcom_dict = ti.xcom_pull("process_items.retrieve_item")[map_index]
        print(xcom_dict)
    
    item_dict = retrieve_item(item=items_dict)
    process_item(item=item_dict)

with DAG(dag_id='af041_Debugger', default_args=default_args, max_active_runs=1, schedule_interval=None):
    end = EmptyOperator(task_id='end', trigger_rule='none_failed_min_one_success')
    items_dict = parse_csv_schedule()
    items_dict >> process_items.expand(items_dict=items_dict) >> end

Resulting in:
image

image

The last task (map_index 5) fails as the Xcom list length only has 5 items instead of 6. What is more of an issue is the task with map_index=3, has pulled the xcom for map_index=4!

This issues appears to happen with both Taskflow API and traditional operators.

What you think should happen instead?

  • Mapped tasks should run independently of each other.
  • Downstream tasks should always be dependent on their associated upstream tasks
  • If a mapped task does not complete then downstream tasks should not be able to access said task's Xcom
  • Downstream tasks should only be able to access Xcoms from their related upstream tasks

Ideally a downstream task could just perform a xcom pull from a upstream task with the same map_index without needing to call ti.map_index. If the Xcom must be a list of all mapped tasks then there should be an explicit reference to the map_index or at the very least, the length of the dict should be the same as the number of mapped tasks.

How to reproduce

Run the provided DAG. You will notice that the last mapped task fails as the map_index for the upstream Xcom does not exist. This is due to the length of the Xcom dict being too short due to one of the mapped tasks not completing.

If there is a better way to access Xcoms from upstream tasks (not just the direct upstream via Taskflow) please let me know as it feels like I'm not using TaskGroups and Xcoms the way it's intended.

Operating System

Ubuntu 22.04

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

AWS EKS

Anything else?

100% reproducible

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@gavinhonl gavinhonl added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jun 19, 2024
@shahar1 shahar1 added area:dynamic-task-mapping AIP-42 and removed needs-triage label for new issues that we didn't triage yet labels Jun 19, 2024
@gavinhonl
Copy link
Author

gavinhonl commented Jun 26, 2024

A workaround is to use the Taskflow API and pull the Xcom as the task's argument like so:

from datetime import datetime, timedelta

from airflow.decorators import task, task_group
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.base import PokeReturnValue

default_args = {
    'owner': 'dev',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 1),
    'provide_context': True,
    'retries': 0,
    'retry_delay': timedelta(seconds=30)
}

@task
def parse_csv_schedule():
    items_dict = [{'A': 1}, {'B': 2}, {'C': 3}, {'D': 4}, {'E': 5}, {'F': 6}]
    return items_dict

@task_group(group_id="process_items")
def process_items(items_dict: dict):            
    @task.sensor(poke_interval=90, mode="reschedule")
    def ignore_3(picked_item: dict) -> PokeReturnValue:
        for letter, integer in picked_item.items():
            if integer == 3:
                print("Number is 3 - reschedule")
                return PokeReturnValue(is_done=False)
            else:
                print("Number is not 3 - Let's go")
                not_three = picked_item
                return PokeReturnValue(is_done=True, xcom_value=not_three)
    
    @task
    def process_item(item: dict):
        print(f"Taskflow XCOM: {item}")

    not_three = ignore_3(picked_item=items_dict)
    process_item(item=not_three)

with DAG(dag_id='af041_Debugger', default_args=default_args, max_active_runs=1, schedule_interval=None):
    end = EmptyOperator(task_id='end', trigger_rule='none_failed_min_one_success')
    items_dict = parse_csv_schedule()
    items_dict >> process_items.expand(items_dict=items_dict) >> end

Resulting in the expected behaviour:
image
image
This approach is less desirable as there is an additional task dependency which affects triggering.

@gavinhonl gavinhonl reopened this Jun 26, 2024
@eladkal eladkal added the affected_version:2.8 Issues Reported for 2.8 label Jun 28, 2024
@potiuk
Copy link
Member

potiuk commented Jul 2, 2024

cc: @RNHTTR -> does it ring a bell ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.8 Issues Reported for 2.8 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

4 participants