-
Enhancement
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
None
-
False
-
-
JDBC connector is not meant to be used on schema change topic. In case of a schema change record it logs a ConnectException
2023-07-11 15:22:51,558 TRACE || Received SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='aurora', kafkaPartition=0, key=Struct{databaseName=}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=2.4.0-SNAPSHOT,connector=mysql,name=aurora,ts_ms=1689088762718,snapshot=true,db=,server_id=0,file=mysql-bin.000003,pos=157,row=0},ts_ms=1689088762949,databaseName=,ddl=SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci,tableChanges=[]}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=1689088763614, headers=ConnectHeaders(headers=)} [io.debezium.connector.jdbc.JdbcSinkConnectorTask] 2023-07-11 15:22:51,558 TRACE || Schema type 'STRING' resolved by name from registry to type 'ConnectStringType' [io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect] 2023-07-11 15:22:51,558 TRACE || Field [databaseName] with schema [STRING] [io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor] 2023-07-11 15:22:51,558 TRACE || Type : io.debezium.connector.jdbc.type.connect.ConnectStringType [io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor] 2023-07-11 15:22:51,558 TRACE || Optional : false [io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor] 2023-07-11 15:22:51,558 DEBUG || Rewinding topic aurora offset to 0. [io.debezium.connector.jdbc.JdbcSinkConnectorTask] 2023-07-11 15:22:51,558 ERROR || Failed to process record: Failed to process a sink record [io.debezium.connector.jdbc.JdbcSinkConnectorTask] org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:81) at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:89) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to resolve column type for schema: STRUCT (io.debezium.connector.mysql.Source) at io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect.getSchemaType(GeneralDatabaseDialect.java:433) at io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor.<init>(SinkRecordDescriptor.java:175) at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.applyNonKeyFields(SinkRecordDescriptor.java:428) at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.readSinkRecordNonKeyData(SinkRecordDescriptor.java:410) at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:277) at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:69) ... 13 more
Log a more specific error and also document that schema change records are not supported
- links to
-
RHEA-2023:120698 Red Hat build of Debezium 2.3.4 release