Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,7 @@ message EventLog {
string table_name = 2;
string cdc_table_id = 3;
string upstream_ddl = 4;
string fail_info = 5;
}
message EventSinkFail {
uint32 sink_id = 1;
Expand Down Expand Up @@ -912,6 +913,7 @@ message AddEventLogRequest {
oneof event {
EventLog.EventWorkerNodePanic worker_node_panic = 1;
EventLog.EventSinkFail sink_fail = 2;
EventLog.EventAutoSchemaChangeFail auto_schema_change_fail = 3;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/batch/executors/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl SourceExecutor {
},
ConnectorProperties::default(),
None,
None, // No callback for batch executor
));
let (stream, _) = self
.source
Expand Down
5 changes: 4 additions & 1 deletion src/connector/codec/src/decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ pub enum AccessError {
value: String,
},
#[error("Unsupported data type `{ty}`")]
UnsupportedType { ty: String },
UnsupportedType {
ty: String,
table_name: Option<String>,
},

#[error("Unsupported additional column `{name}`")]
UnsupportedAdditionalColumn { name: String },
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl CsvParser {
_ => {
return Err(AccessError::UnsupportedType {
ty: dtype.to_string(),
table_name: None,
});
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/debezium/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ impl From<&str> for TableChangeType {

#[derive(Debug)]
pub struct TableSchemaChange {
pub(crate) cdc_table_id: String,
pub cdc_table_id: String,
pub(crate) columns: Vec<ColumnCatalog>,
pub(crate) change_type: TableChangeType,
pub(crate) upstream_ddl: String,
pub upstream_ddl: String,
}

impl SchemaChangeEnvelope {
Expand Down
49 changes: 48 additions & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use risingwave_common::bail;
use thiserror_ext::AsReport;

use super::unified::json::{TimeHandling, TimestampHandling, TimestamptzHandling};
use super::unified::kv_event::KvEvent;
Expand Down Expand Up @@ -136,7 +137,53 @@ impl PlainParser {
&self.source_ctx.connector_props,
) {
Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)),
Err(err) => Err(err)?,
Err(err) => {
// Report CDC auto schema change fail event
let (fail_info, table_name, cdc_table_id) = match &err {
crate::parser::AccessError::UnsupportedType {
ty,
table_name,
..
} => {
if let Some(table_name) = table_name {
// Parse table_name format: "schema"."table" -> schema.table
let clean_table_name =
table_name.trim_matches('"').replace("\".\"", ".");
let fail_info = format!(
"Unsupported postgres type '{}' in table '{}'",
ty, clean_table_name
);
// Build cdc_table_id: source_name.schema.table_name
let cdc_table_id = format!(
"{}.{}",
self.source_ctx.source_name, clean_table_name
);

(fail_info, clean_table_name, cdc_table_id)
} else {
let fail_info =
format!("Unsupported postgres type: {:?}", ty);
(fail_info, "".to_owned(), "".to_owned())
}
}
_ => {
let fail_info = format!(
"Failed to parse schema change: {:?}",
err.as_report()
);
(fail_info, "".to_owned(), "".to_owned())
}
};
self.source_ctx.report_cdc_auto_schema_change_fail(
self.source_ctx.source_id.table_id,
table_name,
cdc_table_id,
"".to_owned(), // upstream_ddl is not available in this context
fail_info,
);

Err(err)?
}
};
}
CdcMessageType::Unspecified => {
Expand Down
16 changes: 14 additions & 2 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ pub fn parse_schema_change(
};
let id = jsonb_access_field!(jsonb, "id", string);
let ty = jsonb_access_field!(jsonb, "type", string);
// Try to extract table name from the JSON data
let table_name = jsonb_access_field!(jsonb, "id", string)
.trim_matches('"')
.to_owned();
let ddl_type: TableChangeType = ty.as_str().into();
if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) {
tracing::debug!("skip table schema change for create/drop command");
Expand Down Expand Up @@ -241,10 +245,14 @@ pub fn parse_schema_change(
tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message");
AccessError::UnsupportedType {
ty: type_name.clone(),
table_name: Some(table_name.clone()),
}
})?,
None => {
Err(AccessError::UnsupportedType { ty: type_name.clone() })?
return Err(AccessError::UnsupportedType {
ty: type_name,
table_name: Some(table_name.clone()),
});
}
}
}
Expand All @@ -256,10 +264,14 @@ pub fn parse_schema_change(
tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message");
AccessError::UnsupportedType {
ty: type_name.clone(),
table_name: Some(table_name.clone()),
}
})?,
None => {
Err(AccessError::UnsupportedType { ty: type_name })?
Err(AccessError::UnsupportedType {
ty: type_name,
table_name: Some(table_name.clone()),
})?
}
}
}
Expand Down
62 changes: 61 additions & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,38 @@ pub const UPSTREAM_SOURCE_KEY: &str = "connector";

pub const WEBHOOK_CONNECTOR: &str = "webhook";

/// Callback wrapper for reporting CDC auto schema change fail events
/// Parameters: (`table_id`, `table_name`, `cdc_table_id`, `upstream_ddl`, `fail_info`)
#[derive(Clone)]
pub struct CdcAutoSchemaChangeFailCallback(
Arc<dyn Fn(u32, String, String, String, String) + Send + Sync>,
);

impl CdcAutoSchemaChangeFailCallback {
pub fn new<F>(f: F) -> Self
where
F: Fn(u32, String, String, String, String) + Send + Sync + 'static,
{
Self(Arc::new(f))
}

pub fn call(
&self,
table_id: u32,
table_name: String,
cdc_table_id: String,
upstream_ddl: String,
fail_info: String,
) {
self.0(table_id, table_name, cdc_table_id, upstream_ddl, fail_info);
}
}

impl std::fmt::Debug for CdcAutoSchemaChangeFailCallback {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("CdcAutoSchemaChangeFailCallback")
}
}
pub trait TryFromBTreeMap: Sized + UnknownFields {
/// Used to initialize the source properties from the raw untyped `WITH` options.
fn try_from_btreemap(
Expand Down Expand Up @@ -279,7 +311,7 @@ pub struct SourceEnumeratorInfo {
pub source_id: u32,
}

#[derive(Debug, Clone)]
#[derive(Clone, Debug)]
pub struct SourceContext {
pub actor_id: u32,
pub source_id: TableId,
Expand All @@ -291,6 +323,8 @@ pub struct SourceContext {
// source parser put schema change event into this channel
pub schema_change_tx:
Option<mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>>,
// callback function to report CDC auto schema change fail events
pub report_cdc_auto_schema_change_fail: Option<CdcAutoSchemaChangeFailCallback>,
}

impl SourceContext {
Expand All @@ -305,6 +339,7 @@ impl SourceContext {
schema_change_channel: Option<
mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>,
>,
report_cdc_auto_schema_change_fail: Option<CdcAutoSchemaChangeFailCallback>,
) -> Self {
Self {
actor_id,
Expand All @@ -315,6 +350,7 @@ impl SourceContext {
source_ctrl_opts,
connector_props,
schema_change_tx: schema_change_channel,
report_cdc_auto_schema_change_fail,
}
}

Expand All @@ -333,8 +369,32 @@ impl SourceContext {
},
ConnectorProperties::default(),
None,
None,
)
}

/// Report CDC auto schema change fail event
/// Parameters: (`table_id`, `table_name`, `cdc_table_id`, `upstream_ddl`, `fail_info`)
pub fn report_cdc_auto_schema_change_fail(
&self,
table_id: u32,
table_name: String,
cdc_table_id: String,
upstream_ddl: String,
fail_info: String,
) {
if let Some(ref cdc_auto_schema_change_fail_callback) =
self.report_cdc_auto_schema_change_fail
{
cdc_auto_schema_change_fail_callback.call(
table_id,
table_name,
cdc_table_id,
upstream_ddl,
fail_info,
);
}
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
Expand Down
20 changes: 20 additions & 0 deletions src/meta/service/src/ddl_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,18 @@ impl DdlService for DdlServiceImpl {
original_columns = ?original_columns,
new_columns = ?new_columns,
"New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported");

let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned();
add_auto_schema_change_fail_event_log(
&self.meta_metrics,
table.id,
table.name.clone(),
table_change.cdc_table_id.clone(),
table_change.upstream_ddl.clone(),
&self.env.event_log_manager_ref(),
fail_info,
);

return Err(Status::invalid_argument(
"New columns should be a subset or superset of the original columns (including hidden columns)",
));
Expand Down Expand Up @@ -1140,13 +1152,16 @@ impl DdlService for DdlServiceImpl {
upstraem_ddl = table_change.upstream_ddl,
"failed to replace the table",
);
let fail_info =
format!("failed to replace the table: {}", e.as_report());
add_auto_schema_change_fail_event_log(
&self.meta_metrics,
table.id,
table.name.clone(),
table_change.cdc_table_id.clone(),
table_change.upstream_ddl.clone(),
&self.env.event_log_manager_ref(),
fail_info,
);
}
};
Expand All @@ -1160,13 +1175,16 @@ impl DdlService for DdlServiceImpl {
cdc_table_id = table.cdc_table_id,
"failed to get replace table plan",
);
let fail_info =
format!("failed to get replace table plan: {}", e.as_report());
add_auto_schema_change_fail_event_log(
&self.meta_metrics,
table.id,
table.name.clone(),
table_change.cdc_table_id.clone(),
table_change.upstream_ddl.clone(),
&self.env.event_log_manager_ref(),
fail_info,
);
}
};
Expand Down Expand Up @@ -1293,6 +1311,7 @@ fn add_auto_schema_change_fail_event_log(
cdc_table_id: String,
upstream_ddl: String,
event_log_manager: &EventLogManagerRef,
fail_info: String,
) {
meta_metrics
.auto_schema_change_failure_cnt
Expand All @@ -1303,6 +1322,7 @@ fn add_auto_schema_change_fail_event_log(
table_name,
cdc_table_id,
upstream_ddl,
fail_info,
};
event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]);
}
3 changes: 3 additions & 0 deletions src/meta/service/src/event_log_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ impl EventLogService for EventLogServiceImpl {
risingwave_pb::meta::add_event_log_request::Event::SinkFail(e) => {
risingwave_pb::meta::event_log::Event::SinkFail(e)
}
risingwave_pb::meta::add_event_log_request::Event::AutoSchemaChangeFail(e) => {
risingwave_pb::meta::event_log::Event::AutoSchemaChangeFail(e)
}
};
self.event_log_manager.add_event_logs(vec![e]);
Ok(Response::new(AddEventLogResponse {}))
Expand Down
22 changes: 22 additions & 0 deletions src/rpc_client/src/meta_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,28 @@ impl MetaClient {
Ok(())
}

pub async fn add_cdc_auto_schema_change_fail_event(
&self,
table_id: u32,
table_name: String,
cdc_table_id: String,
upstream_ddl: String,
fail_info: String,
) -> Result<()> {
let event = event_log::EventAutoSchemaChangeFail {
table_id,
table_name,
cdc_table_id,
upstream_ddl,
fail_info,
};
let req = AddEventLogRequest {
event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)),
};
self.inner.add_event_log(req).await?;
Ok(())
}

pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result<bool> {
let req = CancelCompactTaskRequest {
task_id,
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/source/fs_fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ impl<S: StateStore, Src: OpendalSource> FsFetchExecutor<S, Src> {
},
source_desc.source.config.clone(),
None,
None, // No callback for fs fetch executor
)
}

Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/source/iceberg_fetch_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ impl<S: StateStore> IcebergFetchExecutor<S> {
},
source_desc.source.config.clone(),
None,
None, // No callback for iceberg fetch executor
)
}

Expand Down
Loading
Loading