-
Enhancement
-
Resolution: Done
-
Major
-
None
-
None
JDBC connector is not meant to be used on schema change topic. In case of a schema change record it logs a ConnectException
2023-07-11 15:22:51,558 TRACE || Received SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='aurora', kafkaPartition=0, key=Struct{databaseName=}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=2.4.0-SNAPSHOT,connector=mysql,name=aurora,ts_ms=1689088762718,snapshot=true,db=,server_id=0,file=mysql-bin.000003,pos=157,row=0},ts_ms=1689088762949,databaseName=,ddl=SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci,tableChanges=[]}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=1689088763614, headers=ConnectHeaders(headers=)} [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2023-07-11 15:22:51,558 TRACE || Schema type 'STRING' resolved by name from registry to type 'ConnectStringType' [io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect]
2023-07-11 15:22:51,558 TRACE || Field [databaseName] with schema [STRING] [io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor]
2023-07-11 15:22:51,558 TRACE || Type : io.debezium.connector.jdbc.type.connect.ConnectStringType [io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor]
2023-07-11 15:22:51,558 TRACE || Optional : false [io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor]
2023-07-11 15:22:51,558 DEBUG || Rewinding topic aurora offset to 0. [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2023-07-11 15:22:51,558 ERROR || Failed to process record: Failed to process a sink record [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:81)
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:89)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to resolve column type for schema: STRUCT (io.debezium.connector.mysql.Source)
at io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect.getSchemaType(GeneralDatabaseDialect.java:433)
at io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor.<init>(SinkRecordDescriptor.java:175)
at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.applyNonKeyFields(SinkRecordDescriptor.java:428)
at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.readSinkRecordNonKeyData(SinkRecordDescriptor.java:410)
at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:277)
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:69)
... 13 more
Log a more specific error and also document that schema change records are not supported
- links to
-
RHEA-2023:120698
Red Hat build of Debezium 2.3.4 release