Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
ef96b9f
initial PoC idea running
davidsbatista Aug 15, 2025
f00919b
removing test code
davidsbatista Aug 15, 2025
443eb39
cleaning up
davidsbatista Aug 15, 2025
9ed9e09
wip
davidsbatista Aug 18, 2025
cdb2718
Merge branch 'main' into poc/resume-state-for-crashes
davidsbatista Aug 19, 2025
f1c01fc
cleaning up demos
davidsbatista Aug 19, 2025
f249e22
adding more pipelines to test persistence saving
davidsbatista Aug 20, 2025
85dd0f1
wip
davidsbatista Aug 20, 2025
f3abfc7
wip
davidsbatista Aug 20, 2025
e17166b
working example for logging components inputs in run time
davidsbatista Aug 21, 2025
87cbb00
reverting to a simpler solution for intermediate results
davidsbatista Aug 21, 2025
db7fca5
cleaning up
davidsbatista Aug 21, 2025
b0caafc
testing that in a crash components outputs/inputs up to the crash poi…
davidsbatista Aug 21, 2025
30176c5
adding tests for state persistance in a RAG pipeline
davidsbatista Aug 21, 2025
b93b221
updataing tests for state persistance in a RAG pipeline
davidsbatista Aug 21, 2025
f79a9cd
Merge branch 'main' into poc/resume-state-for-crashes
davidsbatista Aug 25, 2025
7b75803
removing use cases of agent tests
davidsbatista Aug 25, 2025
d741d55
adding LICENSE header
davidsbatista Aug 25, 2025
b1bdda5
adding LICENSE header
davidsbatista Aug 25, 2025
89aa58f
adding release notes
davidsbatista Aug 25, 2025
a276bc4
updating tests for mocked components only
davidsbatista Aug 25, 2025
2f0ed63
Merge branch 'main' into poc/resume-state-for-crashes
davidsbatista Aug 25, 2025
44d2faf
updating release notes
davidsbatista Aug 25, 2025
d6a0919
adapting PipelineRuntimeError
davidsbatista Aug 25, 2025
98b8b3c
cleaning up tests
davidsbatista Aug 25, 2025
f0165b3
fixing test pipeline crash components inputs/outputs are saved
davidsbatista Aug 25, 2025
2f3db39
fixing tests for state persistance
davidsbatista Aug 25, 2025
91f9a33
removing code
davidsbatista Aug 26, 2025
19c6680
removing code
davidsbatista Aug 26, 2025
a43f253
removing code
davidsbatista Aug 26, 2025
064983d
updating release notes
davidsbatista Aug 26, 2025
d118e7f
validating parameters
davidsbatista Aug 26, 2025
d04899a
Merge branch 'main' into feat/pipeline-checkpoints-crash-resume
davidsbatista Aug 27, 2025
820e442
cleaning
davidsbatista Aug 27, 2025
ed731b5
Merge branch 'main' into feat/pipeline-checkpoints-crash-resume
davidsbatista Aug 28, 2025
fc52b41
wip: debugging
davidsbatista Aug 28, 2025
aa4e820
removing persistance tests
davidsbatista Aug 28, 2025
148b7d3
formatting
davidsbatista Aug 28, 2025
357aea5
formatting
davidsbatista Aug 28, 2025
e0a1eeb
cleaning up code
davidsbatista Aug 28, 2025
b764ea8
updating release notes
davidsbatista Aug 28, 2025
f2d21bb
adding missing docstrings
davidsbatista Aug 28, 2025
32a4ccc
typo in release notes
davidsbatista Aug 28, 2025
5940c37
Update haystack/core/pipeline/pipeline.py
davidsbatista Aug 29, 2025
4c8e648
PR comments
davidsbatista Aug 29, 2025
1dd6fb2
handling potential issues with saving the snapshot file
davidsbatista Aug 29, 2025
d2651f9
updating tests
davidsbatista Aug 29, 2025
e32990e
Merge branch 'main' into feat/pipeline-checkpoints-crash-resume
davidsbatista Aug 29, 2025
04b9615
Update haystack/core/pipeline/pipeline.py
davidsbatista Aug 29, 2025
3133b74
Update haystack/core/pipeline/pipeline.py
davidsbatista Aug 29, 2025
ff7f969
some more improvements
davidsbatista Aug 29, 2025
fd16e51
fixing exxception
davidsbatista Aug 29, 2025
59de130
fixing exception error name conflict
davidsbatista Aug 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions haystack/core/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from haystack.core.pipeline.breakpoint import (
_create_pipeline_snapshot,
_save_pipeline_snapshot,
_trigger_break_point,
_validate_break_point_against_pipeline,
_validate_pipeline_snapshot_against_pipeline,
Expand All @@ -25,6 +26,7 @@
from haystack.dataclasses.breakpoints import AgentBreakpoint, Breakpoint, PipelineSnapshot
from haystack.telemetry import pipeline_running
from haystack.utils import _deserialize_value_with_schema
from haystack.utils.misc import _get_output_dir

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -390,6 +392,36 @@ def run( # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches
except PipelineRuntimeError as error:
# Attach partial pipeline outputs to the error before re-raising
error.pipeline_outputs = pipeline_outputs

# Create a snapshot of the last good state of the pipeline before the error occurred.
pipeline_snapshot_inputs_serialised = deepcopy(inputs)
pipeline_snapshot_inputs_serialised[component_name] = deepcopy(component_inputs)
out_dir = _get_output_dir("pipeline_snapshot")
break_point = Breakpoint(
component_name=component_name,
visit_count=component_visits[component_name],
snapshot_file_path=out_dir,
)
last_good_state_snapshot = _create_pipeline_snapshot(
inputs=pipeline_snapshot_inputs_serialised,
break_point=break_point,
component_visits=component_visits,
original_input_data=data,
ordered_component_names=ordered_component_names,
include_outputs_from=include_outputs_from,
pipeline_outputs=pipeline_outputs,
)
try:
_save_pipeline_snapshot(pipeline_snapshot=last_good_state_snapshot)
logger.info(
"Saved a snapshot of the pipeline's last valid state to '{out_path}'. "
"Review this snapshot to debug the error and resume the pipeline from here.",
out_path=out_dir,
)
except Exception as save_error:
logger.error(
"Failed to save a snapshot of the pipeline's last valid state with error: {e}", e=save_error
)
raise error

# Updates global input state with component outputs and returns outputs that should go to
Expand Down
34 changes: 34 additions & 0 deletions haystack/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: Apache-2.0

import mimetypes
import tempfile
from pathlib import Path
from typing import Any, Optional, Union, overload

Expand Down Expand Up @@ -81,3 +82,36 @@ def _guess_mime_type(path: Path) -> Optional[str]:
mime_type = mimetypes.guess_type(path.as_posix())[0]
# lookup custom mappings if the mime type is not found
return CUSTOM_MIMETYPES.get(extension, mime_type)


def _get_output_dir(out_dir: str) -> str:
"""
Find or create a writable directory for saving status files.

Tries in the following order:

1. ~/.haystack/{out_dir}
2. {tempdir}/haystack/{out_dir}
3. ./.haystack/{out_dir}

:raises RuntimeError: If no directory could be created.
:returns:
The path to the created directory.
"""

candidates = [
Path.home() / ".haystack" / out_dir,
Path(tempfile.gettempdir()) / "haystack" / out_dir,
Path.cwd() / ".haystack" / out_dir,
]

for candidate in candidates:
try:
candidate.mkdir(parents=True, exist_ok=True)
return str(candidate)
except Exception:
continue

raise RuntimeError(
f"Could not create a writable directory for output files in any of the following locations: {candidates}"
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
features:
- |
A snapshot of the last successful step is now saved when an error occurs during a `Pipeline` run. This allows you
to inspect the snapshot, potentially identify and fix the error, and later resume the pipeline from that point
onwards. Avoiding to re-run the entire pipeline from the start. Currently, only available in `Pipeline` and not yet
in `AsyncPipeline`.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#
# SPDX-License-Identifier: Apache-2.0

import os
from unittest.mock import MagicMock, patch

import numpy as np
Expand All @@ -21,6 +22,7 @@
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.document_stores.types import DuplicatePolicy
from haystack.utils.auth import Secret
from haystack.utils.misc import _get_output_dir


def setup_document_store():
Expand Down Expand Up @@ -232,3 +234,7 @@ async def test_async_hybrid_rag_pipeline_crash_on_embedding_retriever(
assert "prompt_builder" not in pipeline_outputs, "Prompt builder should not have run due to crash"
assert "llm" not in pipeline_outputs, "LLM should not have run due to crash"
assert "answer_builder" not in pipeline_outputs, "Answer builder should not have run due to crash"

# check that a pipeline snapshot file was created in the "pipeline_snapshot" directory
snapshot_files = os.listdir(_get_output_dir("pipeline_snapshot"))
assert any(f.endswith(".json") for f in snapshot_files), "No pipeline snapshot file found in debug directory"
Loading