-
Notifications
You must be signed in to change notification settings - Fork 290
fix: Disable Process UDFs for Python Dtypes #5085
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
try: | ||
conn.send((_ERROR, TracebackException.from_exception(e))) | ||
tb = "\n".join(TracebackException.from_exception(e).format()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea why this is now an issue and wasn't before, but I'm not going to investigate this too much right now since it seems to only break in Python 3.9 and 3.10, and since 3.9 should be EOF in a month, its probably OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR implements a safety mechanism to prevent UDFs (User Defined Functions) from using process-based execution when dealing with Python-dtype columns. The core issue addressed is that Python-dtype columns contain arbitrary Python objects (like lambda functions or custom classes) that cannot be guaranteed to be serializable/picklable, which is required for inter-process communication in the UDF execution system.
The changes span multiple layers of the codebase:
Rust Logic Layer: Modified src/daft-logical-plan/src/ops/udf.rs
to add validation that prevents setting use_process=True
when Python dtypes are detected in either input columns or output projections. This provides early error detection with clear error messages.
Execution Layer: Updated src/daft-local-execution/src/intermediate_ops/udf.rs
to automatically detect non-Arrow dtypes (including Python dtypes) using the dtype.is_arrow()
method and force thread-based execution as the default behavior. The pipeline creation code in src/daft-local-execution/src/pipeline.rs
was updated to pass input schema information to enable this detection.
Python Layer: Enhanced error handling in daft/execution/udf_worker.py
to make exception serialization more robust with fallback mechanisms, and fixed a string formatting bug in daft/execution/udf.py
.
Documentation: Added clarifying documentation to the is_arrow()
method in src/daft-schema/src/dtype.rs
to explain its role in determining datatype compatibility with Arrow format.
Testing: Added comprehensive test coverage in tests/expressions/test_legacy_udf.py
to verify that UDFs with Python-dtype inputs work correctly with thread-based execution but properly fail with clear error messages when forced to use process-based execution.
This change maintains backward compatibility while preventing runtime serialization failures, with the UDF system automatically falling back to safer thread-based execution for Python dtypes while still allowing process-based execution for Arrow-compatible types when beneficial.
Confidence score: 4/5
- This PR addresses a real technical limitation with clear, well-implemented solutions across multiple system layers
- Score reflects the comprehensive nature of changes affecting both Rust and Python components, requiring careful coordination
- Pay close attention to the UDF execution logic changes in
src/daft-local-execution/src/intermediate_ops/udf.rs
and validation insrc/daft-logical-plan/src/ops/udf.rs
7 files reviewed, no comments
daft/execution/udf.py
Outdated
@@ -88,7 +88,7 @@ def __init__(self, project_expr: PyExpr, passthrough_exprs: list[PyExpr]) -> Non | |||
expr_projection = ExpressionsProjection( | |||
[Expression._from_pyexpr(expr) for expr in passthrough_exprs] + [Expression._from_pyexpr(project_expr)] | |||
) | |||
expr_projection_bytes = daft.pickle.dumps(expr_projection) | |||
expr_projection_bytes = daft.pickle.dumps((project_expr.name(), expr_projection)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized this is the wrong name (this is the name for the expression, not the UDF), but since we're on a crunch. I fixed it in the next UDF PR that I need to merge today to avoid rerunning CI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I need to address any comments, will modify then
// and that use_process != true | ||
#[cfg(feature = "python")] | ||
{ | ||
if matches!(udf_properties.use_process, Some(true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets check for actor pool as well
@srilman I'm honestly fine with only allowing UDFs to return serializable values. With our v0.6 version bump this might be a good time to start enforcing this, instead of building a workaround when people return unserializable values. What do you think? |
I agree, I think so too, but Arrow2 actually can't serialize Python arrays across processes. So if you used an actor-pool UDF with Python inputs / outputs, it wouldn't work. So I decided to just adhere to that behavior and later on build a method to serialize Python arrays. Then on v0.7 we can enforce this. |
{ | ||
return Err(Error::CreationError { | ||
source: DaftError::InternalError( | ||
format!("UDF `{}` can not set `use_process=True` because it has a Python-dtype input column `{}`. Please unset `use_process` or cast the input to a non-Python dtype if possible.", udf_properties.name, col.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format!("UDF `{}` can not set `use_process=True` because it has a Python-dtype input column `{}`. Please unset `use_process` or cast the input to a non-Python dtype if possible.", udf_properties.name, col.name) | |
format!("UDF `{}` cannot set `use_process=True` because it has a Python-dtype input column `{}`. Please unset `use_process` or cast the input to a non-Python dtype if possible.", udf_properties.name, col.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind, I'm going to address these 2 problems in the next PR (the serialize PR) to reduce CI contention on MacOS runners
reason="Ray runner will always run UDFs on separate processes", | ||
) | ||
def test_udf_python_dtype(): | ||
"""Test that running a UDF with a Python-dtype output column runs on the same process. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is checking for python input column, not output column. Can you add another test for output column
Changes Made
With Python-dtype columns, we can't always guarantee that the values in the column are serializable / picklable to send to another process. So for the time being, lets force these types to run on the same thread.
In the future, we should consider:
Checklist
docs/mkdocs.yml
navigation