diff --git a/proto/meta.proto b/proto/meta.proto index c538ccb8d284b..4c13706bbb4f2 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -844,6 +844,7 @@ message EventLog { string table_name = 2; string cdc_table_id = 3; string upstream_ddl = 4; + string fail_info = 5; } message EventSinkFail { uint32 sink_id = 1; @@ -912,6 +913,7 @@ message AddEventLogRequest { oneof event { EventLog.EventWorkerNodePanic worker_node_panic = 1; EventLog.EventSinkFail sink_fail = 2; + EventLog.EventAutoSchemaChangeFail auto_schema_change_fail = 3; } } diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs index 9d8bf30214227..9be026f3a85a5 100644 --- a/src/connector/codec/src/decoder/mod.rs +++ b/src/connector/codec/src/decoder/mod.rs @@ -36,6 +36,10 @@ pub enum AccessError { #[error("Unsupported data type `{ty}`")] UnsupportedType { ty: String }, + /// CDC auto schema change specific error that may include table context + #[error("CDC auto schema change error: unsupported data type `{ty}` in table `{table_name}`")] + CdcAutoSchemaChangeError { ty: String, table_name: String }, + #[error("Unsupported additional column `{name}`")] UnsupportedAdditionalColumn { name: String }, diff --git a/src/connector/src/parser/debezium/schema_change.rs b/src/connector/src/parser/debezium/schema_change.rs index b101e61d7ee89..4457f058dd75d 100644 --- a/src/connector/src/parser/debezium/schema_change.rs +++ b/src/connector/src/parser/debezium/schema_change.rs @@ -65,10 +65,10 @@ impl From<&str> for TableChangeType { #[derive(Debug)] pub struct TableSchemaChange { - pub(crate) cdc_table_id: String, + pub cdc_table_id: String, pub(crate) columns: Vec, pub(crate) change_type: TableChangeType, - pub(crate) upstream_ddl: String, + pub upstream_ddl: String, } impl SchemaChangeEnvelope { diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index b34ee55fa5723..22000800293c0 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -13,6 +13,7 @@ // limitations under the License. use risingwave_common::bail; +use thiserror_ext::AsReport; use super::unified::json::{TimeHandling, TimestampHandling, TimestamptzHandling}; use super::unified::kv_event::KvEvent; @@ -133,10 +134,52 @@ impl PlainParser { return match parse_schema_change( &accessor, self.source_ctx.source_id.into(), + &self.source_ctx.source_name, &self.source_ctx.connector_props, ) { Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)), - Err(err) => Err(err)?, + Err(err) => { + // Report CDC auto schema change fail event + let (fail_info, table_name, cdc_table_id) = match &err { + crate::parser::AccessError::CdcAutoSchemaChangeError { + ty, + table_name, + .. + } => { + // Parse table_name format: "schema"."table" -> schema.table + let clean_table_name = + table_name.trim_matches('"').replace("\".\"", "."); + let fail_info = format!( + "Unsupported data type '{}' in source '{}' table '{}'", + ty, self.source_ctx.source_name, clean_table_name + ); + // Build cdc_table_id: source_name.schema.table_name + let cdc_table_id = format!( + "{}.{}", + self.source_ctx.source_name, clean_table_name + ); + + (fail_info, clean_table_name, cdc_table_id) + } + _ => { + let fail_info = format!( + "Failed to parse schema change: {:?}, source: {}", + err.as_report(), + self.source_ctx.source_name + ); + (fail_info, "".to_owned(), "".to_owned()) + } + }; + self.source_ctx.on_cdc_auto_schema_change_failure( + self.source_ctx.source_id.table_id, + table_name, + cdc_table_id, + "".to_owned(), // upstream_ddl is not available in this context + fail_info, + ); + + Err(err)? + } }; } CdcMessageType::Unspecified => { diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index e61b1886a9f8d..ae925247976ab 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -191,6 +191,7 @@ macro_rules! jsonb_access_field { pub fn parse_schema_change( accessor: &impl Access, source_id: u32, + source_name: &str, connector_props: &ConnectorProperties, ) -> AccessResult { let mut schema_changes = vec![]; @@ -211,8 +212,10 @@ pub fn parse_schema_change( Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb, _ => unreachable!(""), }; - let id = jsonb_access_field!(jsonb, "id", string); + let id: String = jsonb_access_field!(jsonb, "id", string); let ty = jsonb_access_field!(jsonb, "type", string); + + let table_name = id.trim_matches('"').to_owned(); let ddl_type: TableChangeType = ty.as_str().into(); if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) { tracing::debug!("skip table schema change for create/drop command"); @@ -237,14 +240,24 @@ pub fn parse_schema_change( DataType::Varchar } else { match ty { - Some(ty) => pg_type_to_rw_type(&ty).map_err(|err| { - tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message"); - AccessError::UnsupportedType { - ty: type_name.clone(), + Some(ty) => match pg_type_to_rw_type(&ty) { + Ok(data_type) => data_type, + Err(err) => { + tracing::warn!(error=%err.as_report(), "unsupported postgres type in schema change message"); + return Err(AccessError::CdcAutoSchemaChangeError { + ty: type_name, + table_name: format!( + "{}.{}", + source_name, table_name + ), + }); } - })?, + }, None => { - Err(AccessError::UnsupportedType { ty: type_name.clone() })? + return Err(AccessError::CdcAutoSchemaChangeError { + ty: type_name, + table_name: format!("{}.{}", source_name, table_name), + }); } } } @@ -252,14 +265,21 @@ pub fn parse_schema_change( ConnectorProperties::MysqlCdc(_) => { let ty = type_name_to_mysql_type(type_name.as_str()); match ty { - Some(ty) => mysql_type_to_rw_type(&ty).map_err(|err| { - tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message"); - AccessError::UnsupportedType { - ty: type_name.clone(), + Some(ty) => match mysql_type_to_rw_type(&ty) { + Ok(data_type) => data_type, + Err(err) => { + tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message"); + return Err(AccessError::CdcAutoSchemaChangeError { + ty: type_name, + table_name: format!("{}.{}", source_name, table_name), + }); } - })?, + }, None => { - Err(AccessError::UnsupportedType { ty: type_name })? + return Err(AccessError::CdcAutoSchemaChangeError { + ty: type_name, + table_name: format!("{}.{}", source_name, table_name), + }); } } } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 275baaa599ee2..4786366cef734 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -65,6 +65,38 @@ pub const UPSTREAM_SOURCE_KEY: &str = "connector"; pub const WEBHOOK_CONNECTOR: &str = "webhook"; +/// Callback wrapper for reporting CDC auto schema change fail events +/// Parameters: (`table_id`, `table_name`, `cdc_table_id`, `upstream_ddl`, `fail_info`) +#[derive(Clone)] +pub struct CdcAutoSchemaChangeFailCallback( + Arc, +); + +impl CdcAutoSchemaChangeFailCallback { + pub fn new(f: F) -> Self + where + F: Fn(u32, String, String, String, String) + Send + Sync + 'static, + { + Self(Arc::new(f)) + } + + pub fn call( + &self, + table_id: u32, + table_name: String, + cdc_table_id: String, + upstream_ddl: String, + fail_info: String, + ) { + self.0(table_id, table_name, cdc_table_id, upstream_ddl, fail_info); + } +} + +impl std::fmt::Debug for CdcAutoSchemaChangeFailCallback { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("CdcAutoSchemaChangeFailCallback") + } +} pub trait TryFromBTreeMap: Sized + UnknownFields { /// Used to initialize the source properties from the raw untyped `WITH` options. fn try_from_btreemap( @@ -279,7 +311,7 @@ pub struct SourceEnumeratorInfo { pub source_id: u32, } -#[derive(Debug, Clone)] +#[derive(Clone, Debug)] pub struct SourceContext { pub actor_id: u32, pub source_id: TableId, @@ -291,6 +323,8 @@ pub struct SourceContext { // source parser put schema change event into this channel pub schema_change_tx: Option)>>, + // callback function to report CDC auto schema change fail events + pub on_cdc_auto_schema_change_failure: Option, } impl SourceContext { @@ -305,6 +339,32 @@ impl SourceContext { schema_change_channel: Option< mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>, >, + ) -> Self { + Self::new_with_auto_schema_change_callback( + actor_id, + source_id, + fragment_id, + source_name, + metrics, + source_ctrl_opts, + connector_props, + schema_change_channel, + None, + ) + } + + pub fn new_with_auto_schema_change_callback( + actor_id: u32, + source_id: TableId, + fragment_id: u32, + source_name: String, + metrics: Arc, + source_ctrl_opts: SourceCtrlOpts, + connector_props: ConnectorProperties, + schema_change_channel: Option< + mpsc::Sender<(SchemaChangeEnvelope, tokio::sync::oneshot::Sender<()>)>, + >, + on_cdc_auto_schema_change_failure: Option, ) -> Self { Self { actor_id, @@ -315,6 +375,7 @@ impl SourceContext { source_ctrl_opts, connector_props, schema_change_tx: schema_change_channel, + on_cdc_auto_schema_change_failure, } } @@ -335,6 +396,29 @@ impl SourceContext { None, ) } + + /// Report CDC auto schema change fail event + /// Parameters: (`table_id`, `table_name`, `cdc_table_id`, `upstream_ddl`, `fail_info`) + pub fn on_cdc_auto_schema_change_failure( + &self, + table_id: u32, + table_name: String, + cdc_table_id: String, + upstream_ddl: String, + fail_info: String, + ) { + if let Some(ref cdc_auto_schema_change_fail_callback) = + self.on_cdc_auto_schema_change_failure + { + cdc_auto_schema_change_fail_callback.call( + table_id, + table_name, + cdc_table_id, + upstream_ddl, + fail_info, + ); + } + } } #[derive(Clone, Copy, Debug, PartialEq, Eq, Default)] diff --git a/src/meta/service/src/ddl_service.rs b/src/meta/service/src/ddl_service.rs index ef6d705d63973..df42966f67708 100644 --- a/src/meta/service/src/ddl_service.rs +++ b/src/meta/service/src/ddl_service.rs @@ -1037,6 +1037,18 @@ impl DdlService for DdlServiceImpl { original_columns = ?original_columns, new_columns = ?new_columns, "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported"); + + let fail_info = "New columns should be a subset or superset of the original columns (including hidden columns), since only `ADD COLUMN` and `DROP COLUMN` is supported".to_owned(); + add_auto_schema_change_fail_event_log( + &self.meta_metrics, + table.id, + table.name.clone(), + table_change.cdc_table_id.clone(), + table_change.upstream_ddl.clone(), + &self.env.event_log_manager_ref(), + fail_info, + ); + return Err(Status::invalid_argument( "New columns should be a subset or superset of the original columns (including hidden columns)", )); @@ -1114,6 +1126,8 @@ impl DdlService for DdlServiceImpl { upstraem_ddl = table_change.upstream_ddl, "failed to replace the table", ); + let fail_info = + format!("failed to replace the table: {}", e.as_report()); add_auto_schema_change_fail_event_log( &self.meta_metrics, table.id, @@ -1121,6 +1135,7 @@ impl DdlService for DdlServiceImpl { table_change.cdc_table_id.clone(), table_change.upstream_ddl.clone(), &self.env.event_log_manager_ref(), + fail_info, ); } }; @@ -1134,6 +1149,8 @@ impl DdlService for DdlServiceImpl { cdc_table_id = table.cdc_table_id, "failed to get replace table plan", ); + let fail_info = + format!("failed to get replace table plan: {}", e.as_report()); add_auto_schema_change_fail_event_log( &self.meta_metrics, table.id, @@ -1141,6 +1158,7 @@ impl DdlService for DdlServiceImpl { table_change.cdc_table_id.clone(), table_change.upstream_ddl.clone(), &self.env.event_log_manager_ref(), + fail_info, ); } }; @@ -1267,6 +1285,7 @@ fn add_auto_schema_change_fail_event_log( cdc_table_id: String, upstream_ddl: String, event_log_manager: &EventLogManagerRef, + fail_info: String, ) { meta_metrics .auto_schema_change_failure_cnt @@ -1277,6 +1296,7 @@ fn add_auto_schema_change_fail_event_log( table_name, cdc_table_id, upstream_ddl, + fail_info, }; event_log_manager.add_event_logs(vec![event_log::Event::AutoSchemaChangeFail(event)]); } diff --git a/src/meta/service/src/event_log_service.rs b/src/meta/service/src/event_log_service.rs index 2726cd0f5538c..2883690be78ae 100644 --- a/src/meta/service/src/event_log_service.rs +++ b/src/meta/service/src/event_log_service.rs @@ -53,6 +53,9 @@ impl EventLogService for EventLogServiceImpl { risingwave_pb::meta::add_event_log_request::Event::SinkFail(e) => { risingwave_pb::meta::event_log::Event::SinkFail(e) } + risingwave_pb::meta::add_event_log_request::Event::AutoSchemaChangeFail(e) => { + risingwave_pb::meta::event_log::Event::AutoSchemaChangeFail(e) + } }; self.event_log_manager.add_event_logs(vec![e]); Ok(Response::new(AddEventLogResponse {})) diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 92d4ab5dd5e72..2fe0bb6cc1e36 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -1674,6 +1674,28 @@ impl MetaClient { Ok(()) } + pub async fn add_cdc_auto_schema_change_fail_event( + &self, + table_id: u32, + table_name: String, + cdc_table_id: String, + upstream_ddl: String, + fail_info: String, + ) -> Result<()> { + let event = event_log::EventAutoSchemaChangeFail { + table_id, + table_name, + cdc_table_id, + upstream_ddl, + fail_info, + }; + let req = AddEventLogRequest { + event: Some(add_event_log_request::Event::AutoSchemaChangeFail(event)), + }; + self.inner.add_event_log(req).await?; + Ok(()) + } + pub async fn cancel_compact_task(&self, task_id: u64, task_status: TaskStatus) -> Result { let req = CancelCompactTaskRequest { task_id, diff --git a/src/stream/src/executor/source/reader_stream.rs b/src/stream/src/executor/source/reader_stream.rs index 2f2548e0dd624..0222746e9f277 100644 --- a/src/stream/src/executor/source/reader_stream.rs +++ b/src/stream/src/executor/source/reader_stream.rs @@ -20,8 +20,8 @@ use risingwave_common::catalog::{ColumnId, TableId}; use risingwave_connector::parser::schema_change::SchemaChangeEnvelope; use risingwave_connector::source::reader::desc::SourceDesc; use risingwave_connector::source::{ - BoxSourceChunkStream, ConnectorState, CreateSplitReaderResult, SourceContext, SourceCtrlOpts, - SplitMetaData, StreamChunkWithState, + BoxSourceChunkStream, CdcAutoSchemaChangeFailCallback, ConnectorState, CreateSplitReaderResult, + SourceContext, SourceCtrlOpts, SplitMetaData, StreamChunkWithState, }; use thiserror_ext::AsReport; use tokio::sync::{mpsc, oneshot}; @@ -30,6 +30,11 @@ use super::{apply_rate_limit, get_split_offset_col_idx}; use crate::common::rate_limit::limited_chunk_size; use crate::executor::prelude::*; +type AutoSchemaChangeSetup = ( + Option)>>, + Option, +); + pub(crate) struct StreamReaderBuilder { pub source_desc: SourceDesc, pub rate_limit: Option, @@ -43,17 +48,10 @@ pub(crate) struct StreamReaderBuilder { } impl StreamReaderBuilder { - fn prepare_source_stream_build(&self) -> (Vec, SourceContext) { - let column_ids = self - .source_desc - .columns - .iter() - .map(|column_desc| column_desc.column_id) - .collect_vec(); - - let (schema_change_tx, mut schema_change_rx) = - mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16); - let schema_change_tx = if self.is_auto_schema_change_enable { + fn setup_auto_schema_change(&self) -> AutoSchemaChangeSetup { + if self.is_auto_schema_change_enable { + let (schema_change_tx, mut schema_change_rx) = + mpsc::channel::<(SchemaChangeEnvelope, oneshot::Sender<()>)>(16); let meta_client = self.actor_ctx.meta_client.clone(); // spawn a task to handle schema change event from source parser let _join_handle = tokio::task::spawn(async move { @@ -78,19 +76,70 @@ impl StreamReaderBuilder { tracing::error!( target: "auto_schema_change", error = %e.as_report(), "schema change error"); + finish_tx.send(()).unwrap(); } } } } }); - Some(schema_change_tx) + + // Create callback function for reporting CDC auto schema change fail events + let on_cdc_auto_schema_change_failure = if let Some(ref meta_client) = + self.actor_ctx.meta_client + { + let meta_client = meta_client.clone(); + let source_id = self.source_id; + Some(CdcAutoSchemaChangeFailCallback::new( + move |table_id: u32, + table_name: String, + cdc_table_id: String, + upstream_ddl: String, + fail_info: String| { + let meta_client = meta_client.clone(); + let source_id = source_id; + tokio::spawn(async move { + if let Err(e) = meta_client + .add_cdc_auto_schema_change_fail_event( + table_id, + table_name, + cdc_table_id, + upstream_ddl, + fail_info, + ) + .await + { + tracing::warn!( + error = %e.as_report(), + source_id = source_id.table_id, + "Failed to add CDC auto schema change fail event to event log." + ); + } + }); + }, + )) + } else { + None + }; + + (Some(schema_change_tx), on_cdc_auto_schema_change_failure) } else { info!("auto schema change is disabled in config"); - None - }; + (None, None) + } + } + + fn prepare_source_stream_build(&self) -> (Vec, SourceContext) { + let column_ids = self + .source_desc + .columns + .iter() + .map(|column_desc| column_desc.column_id) + .collect_vec(); + + let (schema_change_tx, on_cdc_auto_schema_change_failure) = self.setup_auto_schema_change(); - let source_ctx = SourceContext::new( + let source_ctx = SourceContext::new_with_auto_schema_change_callback( self.actor_ctx.id, self.source_id, self.actor_ctx.fragment_id, @@ -102,6 +151,7 @@ impl StreamReaderBuilder { }, self.source_desc.source.config.clone(), schema_change_tx, + on_cdc_auto_schema_change_failure, ); (column_ids, source_ctx) diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 8415d4d9b61cc..bbdc415a5b62d 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -182,7 +182,6 @@ impl SourceExecutor { info!("auto schema change is disabled in config"); None }; - let source_ctx = SourceContext::new( self.actor_ctx.id, self.stream_source_core.source_id,