Skip to content

Commit f48789f

Browse files
feat: raise last good snapshot in PipelineRunTimeError + tests updates (#9758)
* raise last good snapshot in PipelineRunTimeError + tests updates * adding release notes * renaming test file * PR comments + fixing tests
1 parent 68f4cc7 commit f48789f

File tree

5 files changed

+13
-82
lines changed

5 files changed

+13
-82
lines changed

haystack/core/errors.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
from typing import Any, Optional
66

7+
from haystack.dataclasses.breakpoints import PipelineSnapshot
8+
79

810
class PipelineError(Exception):
911
pass
@@ -15,11 +17,11 @@ def __init__(
1517
component_name: Optional[str],
1618
component_type: Optional[type],
1719
message: str,
18-
pipeline_outputs: Optional[Any] = None,
20+
pipeline_snapshot: Optional[PipelineSnapshot] = None,
1921
) -> None:
2022
self.component_name = component_name
2123
self.component_type = component_type
22-
self.pipeline_outputs = pipeline_outputs
24+
self.pipeline_snapshot = pipeline_snapshot
2325
super().__init__(message)
2426

2527
@classmethod

haystack/core/pipeline/async_pipeline.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,6 @@ async def _run_highest_in_isolation(component_name: str) -> AsyncIterator[dict[s
271271
parent_span=parent_span,
272272
)
273273
except PipelineRuntimeError as error:
274-
# Attach partial pipeline outputs to the error before re-raising
275-
error.pipeline_outputs = pipeline_outputs
276274
raise error
277275

278276
# Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`
@@ -321,8 +319,6 @@ async def _runner():
321319
parent_span=parent_span,
322320
)
323321
except PipelineRuntimeError as error:
324-
# Attach partial pipeline outputs to the error before re-raising
325-
error.pipeline_outputs = pipeline_outputs
326322
raise error
327323

328324
# Distribute outputs to downstream inputs; also prune outputs based on `include_outputs_from`

haystack/core/pipeline/pipeline.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -390,9 +390,6 @@ def run( # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches
390390
parent_span=span,
391391
)
392392
except PipelineRuntimeError as error:
393-
# Attach partial pipeline outputs to the error before re-raising
394-
error.pipeline_outputs = pipeline_outputs
395-
396393
# Create a snapshot of the last good state of the pipeline before the error occurred.
397394
pipeline_snapshot_inputs_serialised = deepcopy(inputs)
398395
pipeline_snapshot_inputs_serialised[component_name] = deepcopy(component_inputs)
@@ -411,6 +408,8 @@ def run( # noqa: PLR0915, PLR0912, C901, pylint: disable=too-many-branches
411408
include_outputs_from=include_outputs_from,
412409
pipeline_outputs=pipeline_outputs,
413410
)
411+
# Attach the last good state snapshot to the error before re-raising and saving to disk
412+
error.pipeline_snapshot = last_good_state_snapshot
414413
try:
415414
_save_pipeline_snapshot(pipeline_snapshot=last_good_state_snapshot)
416415
logger.info(
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
features:
3+
- |
4+
A snapshot of the last successful step is also raised when an error occurs during a `Pipeline` run. Allowing the caller to catch it to inspect the possible reason
5+
for crash and use it to resume the pipeline execution from that point onwards.

test/core/pipeline/test_pipeline_crash_regular_pipeline_outputs_raised.py renamed to test/core/pipeline/test_pipeline_crash_regular_pipeline_snapshot_is_raised.py

Lines changed: 2 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -152,75 +152,8 @@ def test_hybrid_rag_pipeline_crash_on_embedding_retriever(
152152
},
153153
)
154154

155-
pipeline_outputs = exc_info.value.pipeline_outputs
156-
157-
assert pipeline_outputs is not None, "Pipeline outputs should be captured in the exception"
158-
159-
# verify that bm25_retriever and text_embedder ran successfully before the crash
160-
assert "bm25_retriever" in pipeline_outputs, "BM25 retriever output not captured"
161-
assert "documents" in pipeline_outputs["bm25_retriever"], "BM25 retriever should have produced documents"
162-
assert "text_embedder" in pipeline_outputs, "Text embedder output not captured"
163-
assert "embedding" in pipeline_outputs["text_embedder"], "Text embedder should have produced embeddings"
164-
165-
# components after the crash point are not in the outputs
166-
assert "document_joiner" not in pipeline_outputs, "Document joiner should not have run due to crash"
167-
assert "prompt_builder" not in pipeline_outputs, "Prompt builder should not have run due to crash"
168-
assert "llm" not in pipeline_outputs, "LLM should not have run due to crash"
169-
assert "answer_builder" not in pipeline_outputs, "Answer builder should not have run due to crash"
170-
171-
@pytest.mark.asyncio
172-
async def test_async_hybrid_rag_pipeline_crash_on_embedding_retriever(
173-
self, mock_sentence_transformers_text_embedder, monkeypatch
174-
):
175-
monkeypatch.setenv("OPENAI_API_KEY", "test-api-key")
176-
177-
document_store = setup_document_store()
178-
text_embedder = mock_sentence_transformers_text_embedder
179-
invalid_embedding_retriever = InvalidOutputEmbeddingRetriever()
180-
bm25_retriever = InMemoryBM25Retriever(document_store)
181-
document_joiner = DocumentJoiner(join_mode="concatenate")
182-
183-
pipeline = AsyncPipeline()
184-
pipeline.add_component("text_embedder", text_embedder)
185-
pipeline.add_component("embedding_retriever", invalid_embedding_retriever)
186-
pipeline.add_component("bm25_retriever", bm25_retriever)
187-
pipeline.add_component("document_joiner", document_joiner)
188-
pipeline.add_component(
189-
"prompt_builder", ChatPromptBuilder(template=template, required_variables=["question", "documents"])
190-
)
191-
pipeline.add_component("llm", OpenAIChatGenerator(api_key=Secret.from_env_var("OPENAI_API_KEY")))
192-
pipeline.add_component("answer_builder", AnswerBuilder())
193-
194-
pipeline.connect("text_embedder", "embedding_retriever")
195-
pipeline.connect("bm25_retriever", "document_joiner")
196-
pipeline.connect("embedding_retriever", "document_joiner")
197-
pipeline.connect("document_joiner.documents", "prompt_builder.documents")
198-
pipeline.connect("prompt_builder", "llm")
199-
pipeline.connect("llm.replies", "answer_builder.replies")
200-
201-
question = "Where does Mark live?"
202-
test_data = {
203-
"text_embedder": {"text": question},
204-
"bm25_retriever": {"query": question},
205-
"prompt_builder": {"question": question},
206-
"answer_builder": {"query": question},
207-
}
208-
209-
with pytest.raises(PipelineRuntimeError) as exc_info:
210-
await pipeline.run_async(
211-
data=test_data,
212-
include_outputs_from={
213-
"text_embedder",
214-
"embedding_retriever",
215-
"bm25_retriever",
216-
"document_joiner",
217-
"prompt_builder",
218-
"llm",
219-
"answer_builder",
220-
},
221-
)
222-
223-
pipeline_outputs = exc_info.value.pipeline_outputs
155+
pipeline_snapshot = exc_info.value.pipeline_snapshot
156+
pipeline_outputs = pipeline_snapshot.pipeline_state.pipeline_outputs
224157
assert pipeline_outputs is not None, "Pipeline outputs should be captured in the exception"
225158

226159
# verify that bm25_retriever and text_embedder ran successfully before the crash
@@ -234,7 +167,3 @@ async def test_async_hybrid_rag_pipeline_crash_on_embedding_retriever(
234167
assert "prompt_builder" not in pipeline_outputs, "Prompt builder should not have run due to crash"
235168
assert "llm" not in pipeline_outputs, "LLM should not have run due to crash"
236169
assert "answer_builder" not in pipeline_outputs, "Answer builder should not have run due to crash"
237-
238-
# check that a pipeline snapshot file was created in the "pipeline_snapshot" directory
239-
snapshot_files = os.listdir(_get_output_dir("pipeline_snapshot"))
240-
assert any(f.endswith(".json") for f in snapshot_files), "No pipeline snapshot file found in debug directory"

0 commit comments

Comments
 (0)