Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions airflow-core/src/airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2046,16 +2046,7 @@ def from_sdk_dag(cls, dag: TaskSDKDag) -> DAG:
if not field.init or field.name in ["edge_info"]:
continue

value = getattr(dag, field.name)

# Handle special cases where values need conversion
if field.name == "max_consecutive_failed_dag_runs":
# SchedulerDAG requires this to be >= 0, while TaskSDKDag allows -1
if value == -1:
# If it is -1, we get the default value from the DAG
continue

kwargs[field.name] = value
kwargs[field.name] = getattr(dag, field.name)

new_dag = cls(**kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,10 +686,6 @@ def validate_deserialized_dag(self, serialized_dag: DAG, dag: DAG):
actual = getattr(serialized_dag, field)
expected = getattr(dag, field, None)

if field == "max_consecutive_failed_dag_runs":
# TaskSDK sets -1 default to max_consecutive_failed_dag_runs
assert actual in [expected, 0], f"{dag.dag_id}.{field} does not match"
continue
assert actual == expected, f"{dag.dag_id}.{field} does not match"
# _processor_dags_folder is only populated at serialization time
# it's only used when relying on serialized dag to determine a dag's relative path
Expand Down
4 changes: 1 addition & 3 deletions task-sdk/src/airflow/sdk/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,7 @@ def __rich_repr__(self):
user_defined_filters: dict | None = None
max_active_tasks: int = attrs.field(default=16, validator=attrs.validators.instance_of(int))
max_active_runs: int = attrs.field(default=16, validator=attrs.validators.instance_of(int))
max_consecutive_failed_dag_runs: int = attrs.field(
default=-1, validator=attrs.validators.instance_of(int)
)
max_consecutive_failed_dag_runs: int = attrs.field(default=0, validator=attrs.validators.instance_of(int))
dagrun_timeout: timedelta | None = attrs.field(
default=None,
validator=attrs.validators.optional(attrs.validators.instance_of(timedelta)),
Expand Down