Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6655

Log appropriate error when JDBC connector receive SchemaChange record

XMLWordPrintable

      JDBC connector is not meant to be used on schema change topic. In case of a schema change record it logs a ConnectException

       

      2023-07-11 15:22:51,558 TRACE  ||  Received SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='aurora', kafkaPartition=0, key=Struct{databaseName=}, keySchema=Schema{io.debezium.connector.mysql.SchemaChangeKey:STRUCT}, value=Struct{source=Struct{version=2.4.0-SNAPSHOT,connector=mysql,name=aurora,ts_ms=1689088762718,snapshot=true,db=,server_id=0,file=mysql-bin.000003,pos=157,row=0},ts_ms=1689088762949,databaseName=,ddl=SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci,tableChanges=[]}, valueSchema=Schema{io.debezium.connector.mysql.SchemaChangeValue:STRUCT}, timestamp=1689088763614, headers=ConnectHeaders(headers=)}   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-07-11 15:22:51,558 TRACE  ||  Schema type 'STRING' resolved by name from registry to type 'ConnectStringType'   [io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect]
      2023-07-11 15:22:51,558 TRACE  ||  Field [databaseName] with schema [STRING]   [io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor]
      2023-07-11 15:22:51,558 TRACE  ||      Type      : io.debezium.connector.jdbc.type.connect.ConnectStringType   [io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor]
      2023-07-11 15:22:51,558 TRACE  ||      Optional  : false   [io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor]
      2023-07-11 15:22:51,558 DEBUG  ||  Rewinding topic aurora offset to 0.   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      2023-07-11 15:22:51,558 ERROR  ||  Failed to process record: Failed to process a sink record   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
              at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:81)
              at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:89)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
              at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
              at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
              at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
              at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to resolve column type for schema: STRUCT (io.debezium.connector.mysql.Source)
              at io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect.getSchemaType(GeneralDatabaseDialect.java:433)
              at io.debezium.connector.jdbc.SinkRecordDescriptor$FieldDescriptor.<init>(SinkRecordDescriptor.java:175)
              at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.applyNonKeyFields(SinkRecordDescriptor.java:428)
              at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.readSinkRecordNonKeyData(SinkRecordDescriptor.java:410)
              at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:277)
              at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:69)
              ... 13 more
      

       Log a more specific error and also document that schema change records are not supported

              rh-ee-mvitale Mario Fiore Vitale
              rh-ee-mvitale Mario Fiore Vitale
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: