Airflow Xcom Exclusive [updated] -
Implement a dedicated maintenance DAG that safely purges historical XCom records using a standard SQL execution or the DbApiHook .
from datetime import datetime from airflow import DAG from airflow.operators.python import PythonOperator def push_function(**kwargs): kwargs['ti'].xcom_push(key='model_accuracy', value=0.94) def pull_function(**kwargs): ti = kwargs['ti'] accuracy = ti.xcom_pull(task_ids='push_task', key='model_accuracy') print(f"Model accuracy is accuracy") with DAG('legacy_xcom_dag', start_date=datetime(2026, 1, 1), schedule=None) as dag: push_task = PythonOperator(task_id='push_task', python_callable=push_function) pull_task = PythonOperator(task_id='pull_task', python_callable=pull_function) push_task >> pull_task Use code with caution. The TaskFlow API Approach
This article dives deep into XCom exclusive mode, comparing it with the standard model, walking through practical examples, and revealing advanced patterns to level up your Airflow engineering.
def try_claim(session, claim_id, worker_id): row = session.execute(update(claim_xcom) .where(claim_xcom.c.id==claim_id) .where(claim_xcom.c.status=='available') .values(status='claimed', claimed_by=worker_id, claimed_at=func.now()) .returning(claim_xcom)).first() return row # None if already claimed airflow xcom exclusive
When scaling production pipelines, you are likely to encounter one of these common XCom errors. Use this matrix to debug instantly: Error / Symptom Root Cause Immediate Solution
xcom_objectstorage_threshold : The size threshold for switching backends. 5. Troubleshooting XComs in the UI
Enter —a feature designed to enforce stricter boundaries, improve performance, and make your DAGs more predictable. But what exactly is it? How do you enable it? And is it right for your team? Implement a dedicated maintenance DAG that safely purges
When a task finishes executing, it can push an XCom using a specific key. By default, if a Python operator returns a value, Airflow automatically serializes and pushes that value with the key return_value .
Tasks can push data using task_instance.xcom_push(key="my_key", value=data) .
If you want, I can:
from airflow.operators.bash import BashOperator # Pulling the return value of a TaskFlow task into a Bash script bash_task = BashOperator( task_id="log_demographics", bash_command="echo 'The processed data is: ti.xcom_pull(task_ids=\"process_demographics\") '" ) Use code with caution. 5. Security & Governance: Encrypting and Cleaning XCom Data
You push a result, but no downstream task is allowed to pull it. Solution: Define the exclusive mapping at DAG level, and review with airflow dags show-xcom --exclusive-violations .
Add reliability: