Skip to content

Commit 7f9a8c0

Browse files
davispalambtimsaucer
authored
Add TableProvider::insert_into into FFI Bindings (#14391)
* Wrap TableProvider::insert_into This method was missing from the FFI bindings for use in datafusion-python extensions. * Switch from passing the runtime around to it's handle, add the handle of the current async function to the foreign execution plan on the insert_into operation --------- Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: Tim Saucer <[email protected]>
1 parent a0d42ed commit 7f9a8c0

File tree

6 files changed

+207
-18
lines changed

6 files changed

+207
-18
lines changed

datafusion/ffi/src/execution_plan.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use datafusion::{
2727
execution::{SendableRecordBatchStream, TaskContext},
2828
physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
2929
};
30-
use tokio::runtime::Runtime;
30+
use tokio::runtime::Handle;
3131

3232
use crate::{
3333
plan_properties::FFI_PlanProperties, record_batch_stream::FFI_RecordBatchStream,
@@ -72,7 +72,7 @@ unsafe impl Sync for FFI_ExecutionPlan {}
7272
pub struct ExecutionPlanPrivateData {
7373
pub plan: Arc<dyn ExecutionPlan>,
7474
pub context: Arc<TaskContext>,
75-
pub runtime: Option<Arc<Runtime>>,
75+
pub runtime: Option<Handle>,
7676
}
7777

7878
unsafe extern "C" fn properties_fn_wrapper(
@@ -110,7 +110,7 @@ unsafe extern "C" fn execute_fn_wrapper(
110110
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
111111
let plan = &(*private_data).plan;
112112
let ctx = &(*private_data).context;
113-
let runtime = (*private_data).runtime.as_ref().map(Arc::clone);
113+
let runtime = (*private_data).runtime.clone();
114114

115115
match plan.execute(partition, Arc::clone(ctx)) {
116116
Ok(rbs) => RResult::ROk(FFI_RecordBatchStream::new(rbs, runtime)),
@@ -153,7 +153,7 @@ impl FFI_ExecutionPlan {
153153
pub fn new(
154154
plan: Arc<dyn ExecutionPlan>,
155155
context: Arc<TaskContext>,
156-
runtime: Option<Arc<Runtime>>,
156+
runtime: Option<Handle>,
157157
) -> Self {
158158
let private_data = Box::new(ExecutionPlanPrivateData {
159159
plan,

datafusion/ffi/src/insert_op.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use abi_stable::StableAbi;
19+
use datafusion::logical_expr::logical_plan::dml::InsertOp;
20+
21+
/// FFI safe version of [`InsertOp`].
22+
#[repr(C)]
23+
#[derive(StableAbi)]
24+
#[allow(non_camel_case_types)]
25+
pub enum FFI_InsertOp {
26+
Append,
27+
Overwrite,
28+
Replace,
29+
}
30+
31+
impl From<FFI_InsertOp> for InsertOp {
32+
fn from(value: FFI_InsertOp) -> Self {
33+
match value {
34+
FFI_InsertOp::Append => InsertOp::Append,
35+
FFI_InsertOp::Overwrite => InsertOp::Overwrite,
36+
FFI_InsertOp::Replace => InsertOp::Replace,
37+
}
38+
}
39+
}
40+
41+
impl From<InsertOp> for FFI_InsertOp {
42+
fn from(value: InsertOp) -> Self {
43+
match value {
44+
InsertOp::Append => FFI_InsertOp::Append,
45+
InsertOp::Overwrite => FFI_InsertOp::Overwrite,
46+
InsertOp::Replace => FFI_InsertOp::Replace,
47+
}
48+
}
49+
}

datafusion/ffi/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
pub mod arrow_wrappers;
2222
pub mod execution_plan;
23+
pub mod insert_op;
2324
pub mod plan_properties;
2425
pub mod record_batch_stream;
2526
pub mod session_config;

datafusion/ffi/src/record_batch_stream.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{ffi::c_void, sync::Arc, task::Poll};
18+
use std::{ffi::c_void, task::Poll};
1919

2020
use abi_stable::{
2121
std_types::{ROption, RResult, RString},
@@ -33,7 +33,7 @@ use datafusion::{
3333
execution::{RecordBatchStream, SendableRecordBatchStream},
3434
};
3535
use futures::{Stream, TryStreamExt};
36-
use tokio::runtime::Runtime;
36+
use tokio::runtime::Handle;
3737

3838
use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
3939

@@ -61,7 +61,7 @@ pub struct FFI_RecordBatchStream {
6161

6262
pub struct RecordBatchStreamPrivateData {
6363
pub rbs: SendableRecordBatchStream,
64-
pub runtime: Option<Arc<Runtime>>,
64+
pub runtime: Option<Handle>,
6565
}
6666

6767
impl From<SendableRecordBatchStream> for FFI_RecordBatchStream {
@@ -71,7 +71,7 @@ impl From<SendableRecordBatchStream> for FFI_RecordBatchStream {
7171
}
7272

7373
impl FFI_RecordBatchStream {
74-
pub fn new(stream: SendableRecordBatchStream, runtime: Option<Arc<Runtime>>) -> Self {
74+
pub fn new(stream: SendableRecordBatchStream, runtime: Option<Handle>) -> Self {
7575
let private_data = Box::into_raw(Box::new(RecordBatchStreamPrivateData {
7676
rbs: stream,
7777
runtime,

datafusion/ffi/src/table_provider.rs

Lines changed: 145 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ use datafusion::{
2828
catalog::{Session, TableProvider},
2929
datasource::TableType,
3030
error::DataFusionError,
31-
execution::session_state::SessionStateBuilder,
32-
logical_expr::TableProviderFilterPushDown,
31+
execution::{session_state::SessionStateBuilder, TaskContext},
32+
logical_expr::{logical_plan::dml::InsertOp, TableProviderFilterPushDown},
3333
physical_plan::ExecutionPlan,
3434
prelude::{Expr, SessionContext},
3535
};
@@ -40,7 +40,7 @@ use datafusion_proto::{
4040
protobuf::LogicalExprList,
4141
};
4242
use prost::Message;
43-
use tokio::runtime::Runtime;
43+
use tokio::runtime::Handle;
4444

4545
use crate::{
4646
arrow_wrappers::WrappedSchema,
@@ -50,6 +50,7 @@ use crate::{
5050

5151
use super::{
5252
execution_plan::{FFI_ExecutionPlan, ForeignExecutionPlan},
53+
insert_op::FFI_InsertOp,
5354
session_config::FFI_SessionConfig,
5455
};
5556
use datafusion::error::Result;
@@ -133,6 +134,14 @@ pub struct FFI_TableProvider {
133134
-> RResult<RVec<FFI_TableProviderFilterPushDown>, RString>,
134135
>,
135136

137+
pub insert_into:
138+
unsafe extern "C" fn(
139+
provider: &Self,
140+
session_config: &FFI_SessionConfig,
141+
input: &FFI_ExecutionPlan,
142+
insert_op: FFI_InsertOp,
143+
) -> FfiFuture<RResult<FFI_ExecutionPlan, RString>>,
144+
136145
/// Used to create a clone on the provider of the execution plan. This should
137146
/// only need to be called by the receiver of the plan.
138147
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
@@ -153,7 +162,7 @@ unsafe impl Sync for FFI_TableProvider {}
153162

154163
struct ProviderPrivateData {
155164
provider: Arc<dyn TableProvider + Send>,
156-
runtime: Option<Arc<Runtime>>,
165+
runtime: Option<Handle>,
157166
}
158167

159168
unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> WrappedSchema {
@@ -276,6 +285,53 @@ unsafe extern "C" fn scan_fn_wrapper(
276285
.into_ffi()
277286
}
278287

288+
unsafe extern "C" fn insert_into_fn_wrapper(
289+
provider: &FFI_TableProvider,
290+
session_config: &FFI_SessionConfig,
291+
input: &FFI_ExecutionPlan,
292+
insert_op: FFI_InsertOp,
293+
) -> FfiFuture<RResult<FFI_ExecutionPlan, RString>> {
294+
let private_data = provider.private_data as *mut ProviderPrivateData;
295+
let internal_provider = &(*private_data).provider;
296+
let session_config = session_config.clone();
297+
let input = input.clone();
298+
let runtime = &(*private_data).runtime;
299+
300+
async move {
301+
let config = match ForeignSessionConfig::try_from(&session_config) {
302+
Ok(c) => c,
303+
Err(e) => return RResult::RErr(e.to_string().into()),
304+
};
305+
let session = SessionStateBuilder::new()
306+
.with_default_features()
307+
.with_config(config.0)
308+
.build();
309+
let ctx = SessionContext::new_with_state(session);
310+
311+
let input = match ForeignExecutionPlan::try_from(&input) {
312+
Ok(input) => Arc::new(input),
313+
Err(e) => return RResult::RErr(e.to_string().into()),
314+
};
315+
316+
let insert_op = InsertOp::from(insert_op);
317+
318+
let plan = match internal_provider
319+
.insert_into(&ctx.state(), input, insert_op)
320+
.await
321+
{
322+
Ok(p) => p,
323+
Err(e) => return RResult::RErr(e.to_string().into()),
324+
};
325+
326+
RResult::ROk(FFI_ExecutionPlan::new(
327+
plan,
328+
ctx.task_ctx(),
329+
runtime.clone(),
330+
))
331+
}
332+
.into_ffi()
333+
}
334+
279335
unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) {
280336
let private_data = Box::from_raw(provider.private_data as *mut ProviderPrivateData);
281337
drop(private_data);
@@ -295,6 +351,7 @@ unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> FFI_Table
295351
scan: scan_fn_wrapper,
296352
table_type: table_type_fn_wrapper,
297353
supports_filters_pushdown: provider.supports_filters_pushdown,
354+
insert_into: provider.insert_into,
298355
clone: clone_fn_wrapper,
299356
release: release_fn_wrapper,
300357
version: super::version,
@@ -313,7 +370,7 @@ impl FFI_TableProvider {
313370
pub fn new(
314371
provider: Arc<dyn TableProvider + Send>,
315372
can_support_pushdown_filters: bool,
316-
runtime: Option<Arc<Runtime>>,
373+
runtime: Option<Handle>,
317374
) -> Self {
318375
let private_data = Box::new(ProviderPrivateData { provider, runtime });
319376

@@ -325,6 +382,7 @@ impl FFI_TableProvider {
325382
true => Some(supports_filters_pushdown_fn_wrapper),
326383
false => None,
327384
},
385+
insert_into: insert_into_fn_wrapper,
328386
clone: clone_fn_wrapper,
329387
release: release_fn_wrapper,
330388
version: super::version,
@@ -443,6 +501,37 @@ impl TableProvider for ForeignTableProvider {
443501
}
444502
}
445503
}
504+
505+
async fn insert_into(
506+
&self,
507+
session: &dyn Session,
508+
input: Arc<dyn ExecutionPlan>,
509+
insert_op: InsertOp,
510+
) -> Result<Arc<dyn ExecutionPlan>> {
511+
let session_config: FFI_SessionConfig = session.config().into();
512+
513+
let rc = Handle::try_current().ok();
514+
let input =
515+
FFI_ExecutionPlan::new(input, Arc::new(TaskContext::from(session)), rc);
516+
let insert_op: FFI_InsertOp = insert_op.into();
517+
518+
let plan = unsafe {
519+
let maybe_plan =
520+
(self.0.insert_into)(&self.0, &session_config, &input, insert_op).await;
521+
522+
match maybe_plan {
523+
RResult::ROk(p) => ForeignExecutionPlan::try_from(&p)?,
524+
RResult::RErr(e) => {
525+
return Err(DataFusionError::Internal(format!(
526+
"Unable to perform insert_into via FFI: {}",
527+
e
528+
)))
529+
}
530+
}
531+
};
532+
533+
Ok(Arc::new(plan))
534+
}
446535
}
447536

448537
#[cfg(test)]
@@ -453,7 +542,7 @@ mod tests {
453542
use super::*;
454543

455544
#[tokio::test]
456-
async fn test_round_trip_ffi_table_provider() -> Result<()> {
545+
async fn test_round_trip_ffi_table_provider_scan() -> Result<()> {
457546
use arrow::datatypes::Field;
458547
use datafusion::arrow::{
459548
array::Float32Array, datatypes::DataType, record_batch::RecordBatch,
@@ -493,4 +582,54 @@ mod tests {
493582

494583
Ok(())
495584
}
585+
586+
#[tokio::test]
587+
async fn test_round_trip_ffi_table_provider_insert_into() -> Result<()> {
588+
use arrow::datatypes::Field;
589+
use datafusion::arrow::{
590+
array::Float32Array, datatypes::DataType, record_batch::RecordBatch,
591+
};
592+
use datafusion::datasource::MemTable;
593+
594+
let schema =
595+
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
596+
597+
// define data in two partitions
598+
let batch1 = RecordBatch::try_new(
599+
Arc::clone(&schema),
600+
vec![Arc::new(Float32Array::from(vec![2.0, 4.0, 8.0]))],
601+
)?;
602+
let batch2 = RecordBatch::try_new(
603+
Arc::clone(&schema),
604+
vec![Arc::new(Float32Array::from(vec![64.0]))],
605+
)?;
606+
607+
let ctx = SessionContext::new();
608+
609+
let provider =
610+
Arc::new(MemTable::try_new(schema, vec![vec![batch1], vec![batch2]])?);
611+
612+
let ffi_provider = FFI_TableProvider::new(provider, true, None);
613+
614+
let foreign_table_provider: ForeignTableProvider = (&ffi_provider).into();
615+
616+
ctx.register_table("t", Arc::new(foreign_table_provider))?;
617+
618+
let result = ctx
619+
.sql("INSERT INTO t VALUES (128.0);")
620+
.await?
621+
.collect()
622+
.await?;
623+
624+
assert!(result.len() == 1 && result[0].num_rows() == 1);
625+
626+
ctx.table("t")
627+
.await?
628+
.select(vec![col("a")])?
629+
.filter(col("a").gt(lit(3.0)))?
630+
.show()
631+
.await?;
632+
633+
Ok(())
634+
}
496635
}

datafusion/ffi/src/tests/async_provider.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use datafusion::{
4141
};
4242
use futures::Stream;
4343
use tokio::{
44-
runtime::Runtime,
44+
runtime::Handle,
4545
sync::{broadcast, mpsc},
4646
};
4747

@@ -59,7 +59,7 @@ fn async_table_provider_thread(
5959
mut shutdown: mpsc::Receiver<bool>,
6060
mut batch_request: mpsc::Receiver<bool>,
6161
batch_sender: broadcast::Sender<Option<RecordBatch>>,
62-
tokio_rt: mpsc::Sender<Arc<Runtime>>,
62+
tokio_rt: mpsc::Sender<Handle>,
6363
) {
6464
let runtime = Arc::new(
6565
tokio::runtime::Builder::new_current_thread()
@@ -68,7 +68,7 @@ fn async_table_provider_thread(
6868
);
6969
let _runtime_guard = runtime.enter();
7070
tokio_rt
71-
.blocking_send(Arc::clone(&runtime))
71+
.blocking_send(runtime.handle().clone())
7272
.expect("Unable to send tokio runtime back to main thread");
7373

7474
runtime.block_on(async move {
@@ -91,7 +91,7 @@ fn async_table_provider_thread(
9191
let _ = shutdown.blocking_recv();
9292
}
9393

94-
pub fn start_async_provider() -> (AsyncTableProvider, Arc<Runtime>) {
94+
pub fn start_async_provider() -> (AsyncTableProvider, Handle) {
9595
let (batch_request_tx, batch_request_rx) = mpsc::channel(10);
9696
let (record_batch_tx, record_batch_rx) = broadcast::channel(10);
9797
let (tokio_rt_tx, mut tokio_rt_rx) = mpsc::channel(10);

0 commit comments

Comments
 (0)