Skip to content
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
ef96b9f
initial PoC idea running
davidsbatista Aug 15, 2025
f00919b
removing test code
davidsbatista Aug 15, 2025
443eb39
cleaning up
davidsbatista Aug 15, 2025
9ed9e09
wip
davidsbatista Aug 18, 2025
cdb2718
Merge branch 'main' into poc/resume-state-for-crashes
davidsbatista Aug 19, 2025
f1c01fc
cleaning up demos
davidsbatista Aug 19, 2025
f249e22
adding more pipelines to test persistence saving
davidsbatista Aug 20, 2025
85dd0f1
wip
davidsbatista Aug 20, 2025
f3abfc7
wip
davidsbatista Aug 20, 2025
e17166b
working example for logging components inputs in run time
davidsbatista Aug 21, 2025
87cbb00
reverting to a simpler solution for intermediate results
davidsbatista Aug 21, 2025
db7fca5
cleaning up
davidsbatista Aug 21, 2025
b0caafc
testing that in a crash components outputs/inputs up to the crash poi…
davidsbatista Aug 21, 2025
30176c5
adding tests for state persistance in a RAG pipeline
davidsbatista Aug 21, 2025
b93b221
updataing tests for state persistance in a RAG pipeline
davidsbatista Aug 21, 2025
f79a9cd
Merge branch 'main' into poc/resume-state-for-crashes
davidsbatista Aug 25, 2025
7b75803
removing use cases of agent tests
davidsbatista Aug 25, 2025
d741d55
adding LICENSE header
davidsbatista Aug 25, 2025
b1bdda5
adding LICENSE header
davidsbatista Aug 25, 2025
89aa58f
adding release notes
davidsbatista Aug 25, 2025
a276bc4
updating tests for mocked components only
davidsbatista Aug 25, 2025
2f0ed63
Merge branch 'main' into poc/resume-state-for-crashes
davidsbatista Aug 25, 2025
44d2faf
updating release notes
davidsbatista Aug 25, 2025
d6a0919
adapting PipelineRuntimeError
davidsbatista Aug 25, 2025
98b8b3c
cleaning up tests
davidsbatista Aug 25, 2025
f0165b3
fixing test pipeline crash components inputs/outputs are saved
davidsbatista Aug 25, 2025
2f3db39
fixing tests for state persistance
davidsbatista Aug 25, 2025
91f9a33
removing code
davidsbatista Aug 26, 2025
19c6680
removing code
davidsbatista Aug 26, 2025
a43f253
removing code
davidsbatista Aug 26, 2025
064983d
updating release notes
davidsbatista Aug 26, 2025
d118e7f
validating parameters
davidsbatista Aug 26, 2025
d04899a
Merge branch 'main' into feat/pipeline-checkpoints-crash-resume
davidsbatista Aug 27, 2025
820e442
cleaning
davidsbatista Aug 27, 2025
ed731b5
Merge branch 'main' into feat/pipeline-checkpoints-crash-resume
davidsbatista Aug 28, 2025
fc52b41
wip: debugging
davidsbatista Aug 28, 2025
aa4e820
removing persistance tests
davidsbatista Aug 28, 2025
148b7d3
formatting
davidsbatista Aug 28, 2025
357aea5
formatting
davidsbatista Aug 28, 2025
e0a1eeb
cleaning up code
davidsbatista Aug 28, 2025
b764ea8
updating release notes
davidsbatista Aug 28, 2025
f2d21bb
adding missing docstrings
davidsbatista Aug 28, 2025
32a4ccc
typo in release notes
davidsbatista Aug 28, 2025
5940c37
Update haystack/core/pipeline/pipeline.py
davidsbatista Aug 29, 2025
4c8e648
PR comments
davidsbatista Aug 29, 2025
1dd6fb2
handling potential issues with saving the snapshot file
davidsbatista Aug 29, 2025
d2651f9
updating tests
davidsbatista Aug 29, 2025
e32990e
Merge branch 'main' into feat/pipeline-checkpoints-crash-resume
davidsbatista Aug 29, 2025
04b9615
Update haystack/core/pipeline/pipeline.py
davidsbatista Aug 29, 2025
3133b74
Update haystack/core/pipeline/pipeline.py
davidsbatista Aug 29, 2025
ff7f969
some more improvements
davidsbatista Aug 29, 2025
fd16e51
fixing exxception
davidsbatista Aug 29, 2025
59de130
fixing exception error name conflict
davidsbatista Aug 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions haystack/core/pipeline/breakpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ def load_pipeline_snapshot(file_path: Union[str, Path]) -> PipelineSnapshot:


def _save_pipeline_snapshot_to_file(
*, pipeline_snapshot: PipelineSnapshot, snapshot_file_path: Union[str, Path], dt: datetime
*,
pipeline_snapshot: PipelineSnapshot,
snapshot_file_path: Union[str, Path],
dt: datetime,
file_name: Optional[str] = None,
) -> None:
"""
Save the pipeline snapshot dictionary to a JSON file.
Expand All @@ -158,17 +162,18 @@ def _save_pipeline_snapshot_to_file(

snapshot_file_path.mkdir(exist_ok=True)

# Generate filename
# We check if the agent_name is provided to differentiate between agent and non-agent breakpoints
if isinstance(pipeline_snapshot.break_point, AgentBreakpoint):
agent_name = pipeline_snapshot.break_point.agent_name
component_name = pipeline_snapshot.break_point.break_point.component_name
visit_nr = pipeline_snapshot.pipeline_state.component_visits.get(component_name, 0)
file_name = f"{agent_name}_{component_name}_{visit_nr}_{dt.strftime('%Y_%m_%d_%H_%M_%S')}.json"
else:
component_name = pipeline_snapshot.break_point.component_name
visit_nr = pipeline_snapshot.pipeline_state.component_visits.get(component_name, 0)
file_name = f"{component_name}_{visit_nr}_{dt.strftime('%Y_%m_%d_%H_%M_%S')}.json"
if not file_name:
# Generate filename
# We check if the agent_name is provided to differentiate between agent and non-agent breakpoints
if isinstance(pipeline_snapshot.break_point, AgentBreakpoint):
agent_name = pipeline_snapshot.break_point.agent_name
component_name = pipeline_snapshot.break_point.break_point.component_name
visit_nr = pipeline_snapshot.pipeline_state.component_visits.get(component_name, 0)
file_name = f"{agent_name}_{component_name}_{visit_nr}_{dt.strftime('%Y_%m_%d_%H_%M_%S')}.json"
else:
component_name = pipeline_snapshot.break_point.component_name
visit_nr = pipeline_snapshot.pipeline_state.component_visits.get(component_name, 0)
file_name = f"{component_name}_{visit_nr}_{dt.strftime('%Y_%m_%d_%H_%M_%S')}.json"

try:
with open(snapshot_file_path / file_name, "w") as f_out:
Expand Down Expand Up @@ -219,7 +224,7 @@ def _create_pipeline_snapshot(
return pipeline_snapshot


def _save_pipeline_snapshot(pipeline_snapshot: PipelineSnapshot) -> PipelineSnapshot:
def _save_pipeline_snapshot(pipeline_snapshot: PipelineSnapshot, f_name: Optional[str] = None) -> PipelineSnapshot:
"""
Save the pipeline snapshot to a file.

Expand All @@ -244,7 +249,7 @@ def _save_pipeline_snapshot(pipeline_snapshot: PipelineSnapshot) -> PipelineSnap
if snapshot_file_path is not None:
dt = pipeline_snapshot.timestamp or datetime.now()
_save_pipeline_snapshot_to_file(
pipeline_snapshot=pipeline_snapshot, snapshot_file_path=snapshot_file_path, dt=dt
pipeline_snapshot=pipeline_snapshot, snapshot_file_path=snapshot_file_path, dt=dt, file_name=f_name
)

return pipeline_snapshot
Expand Down
67 changes: 67 additions & 0 deletions haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: Apache-2.0

from copy import deepcopy
from datetime import datetime
from typing import Any, Mapping, Optional, Union

from haystack import logging, tracing
Expand All @@ -17,6 +18,7 @@
)
from haystack.core.pipeline.breakpoint import (
_create_pipeline_snapshot,
_save_pipeline_snapshot,
_trigger_break_point,
_validate_break_point_against_pipeline,
_validate_pipeline_snapshot_against_pipeline,
Expand Down Expand Up @@ -175,6 +177,9 @@ def run( # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches
:param pipeline_snapshot:
A dictionary containing a snapshot of a previously saved pipeline execution.

:param state_persistence_path:
The path where the pipeline state should be saved if `state_persistence` is `True`.

:returns:
A dictionary where each entry corresponds to a component name
and its output. If `include_outputs_from` is `None`, this dictionary
Expand Down Expand Up @@ -390,6 +395,24 @@ def run( # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches
except PipelineRuntimeError as error:
# Attach partial pipeline outputs to the error before re-raising
error.pipeline_outputs = pipeline_outputs
# Create a snapshot of the last good state of the pipeline before the error occurred.
last_good_state_snapshot = self.create_last_good_state_snapshot(
component_inputs,
component_name,
component_visits,
data,
include_outputs_from,
inputs,
ordered_component_names,
pipeline_outputs,
)
f_name = f"last_good_state_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}.json"
_save_pipeline_snapshot(pipeline_snapshot=last_good_state_snapshot, f_name=f_name)
logger.info(
"A snapshot of the last good state of the pipeline has been saved to '{f_name}'. "
"You can inspect this snapshot to fix the error and resume the pipeline from this point.",
f_name=f_name,
)
raise error

# Updates global input state with component outputs and returns outputs that should go to
Expand All @@ -416,3 +439,47 @@ def run( # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches
)

return pipeline_outputs

@staticmethod
def create_last_good_state_snapshot( # pylint: disable=too-many-positional-arguments
component_inputs: dict[str, Any],
component_name: str,
component_visits: dict[str, int],
data: dict[str, dict[str, Any]],
include_outputs_from: set[str],
inputs: dict[str, Any],
ordered_component_names: list[str],
pipeline_outputs: dict[str, Any],
) -> PipelineSnapshot:
"""
Creates a snapshot of the last good state of the pipeline before an error occurred.

This snapshot can be used for debugging purposes to understand the state of the pipeline, edited and passed
to the `pipeline_snapshot` parameter of the `Pipeline.run()` method to resume execution from this point.

:param component_inputs: Inputs to the component that was being executed when the error occurred.
:param component_name: Name of the component that was being executed when the error occurred.
:param component_visits: Current state of component visits.
:param data: Original input data provided to the pipeline.
:param include_outputs_from: Set of component names whose outputs are included in the pipeline output.
:param inputs: Current state of all inputs in the pipeline.
:param ordered_component_names: List of component names in the order they were added to the pipeline.
:param pipeline_outputs: Current state of the pipeline outputs.

:return: A PipelineSnapshot representing the last good state of the pipeline.
"""

pipeline_snapshot_inputs_serialised = deepcopy(inputs)
pipeline_snapshot_inputs_serialised[component_name] = deepcopy(component_inputs)
pipeline_snapshot = _create_pipeline_snapshot(
inputs=pipeline_snapshot_inputs_serialised,
# Dummy breakpoint to pass the component_name and state_persistence_path to the _save_pipeline_snapshot
break_point=Breakpoint(component_name=component_name, visit_count=0, snapshot_file_path="debug"),
component_visits=component_visits,
original_input_data=data,
ordered_component_names=ordered_component_names,
include_outputs_from=include_outputs_from,
pipeline_outputs=pipeline_outputs,
)

return pipeline_snapshot
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
features:
- |
A snapshot of the last successful step is now saved when an error occurs during a Pipeline run. This allows you
to inspect the snapshot, potentially identify and fix the error, and later resume the pipeline from that point
onwards. Avoiding to re-run the entire pipeline from the start.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: Apache-2.0

import os
from unittest.mock import MagicMock, patch

import numpy as np
Expand Down Expand Up @@ -232,3 +233,7 @@ async def test_async_hybrid_rag_pipeline_crash_on_embedding_retriever(
assert "prompt_builder" not in pipeline_outputs, "Prompt builder should not have run due to crash"
assert "llm" not in pipeline_outputs, "LLM should not have run due to crash"
assert "answer_builder" not in pipeline_outputs, "Answer builder should not have run due to crash"

# check that a pipeline snapshot file was created in the "debug" directory
snapshot_files = os.listdir("debug")
assert any(f.endswith(".json") for f in snapshot_files), "No pipeline snapshot file found in debug directory"
Loading