Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7065

JDBC sink connector not working with CloudEvent

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 2.5.0.Alpha2
    • 2.4.0.Final
    • jdbc-connector
    • None
    • False
    • None
    • False

      Bug report

      Debezium version: 2.4.0.Final

      What is the connector configuration?

      The cloud event is produced by the following source connector (works with no errors):

      {
        "name": "book-db-source-connector",
        "config": {
          "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
          "plugin.name": "pgoutput",
          "database.hostname": "book-db",
          "database.port": "5432",
          "database.user": "postgres",
          "database.password": "password",
          "database.dbname": "book",
          "database.server.name": "book",
          "topic.prefix": "book",
          "topic.creation.default.replication.factor": 1,
          "topic.creation.default.partitions": 6,
          "table.include.list": "public.outbox",
          "heartbeat.interval.ms": "5000",
          "slot.name": "book_debezium",
          "publication.name": "book_publication",
          "tombstones.on.delete": false,
          "transforms": "addMetadataHeaders,outbox",
          "transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
          "transforms.addMetadataHeaders.fields": "source,op,transaction",
          "transforms.addMetadataHeaders.headers": "source,op,transaction",
          "transforms.addMetadataHeaders.operation": "copy",
          "transforms.addMetadataHeaders.predicate": "isHeartbeat",
          "transforms.addMetadataHeaders.negate": true,
          "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
          "transforms.outbox.table.field.event.key": "event_key",
          "transforms.outbox.table.expand.json.payload": true,
          "transforms.outbox.table.fields.additional.placement": "event_type:header:type",
          "transforms.outbox.route.by.field": "aggregate_type",
          "transforms.outbox.route.topic.replacement": "outbox.event.library",
          "predicates": "isHeartbeat",
          "predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
          "predicates.isHeartbeat.pattern": "__debezium-heartbeat.*",
          "key.converter": "org.apache.kafka.connect.json.JsonConverter",
          "key.converter.schemas.enable": false,
          "value.converter": "io.debezium.converters.CloudEventsConverter",
          "value.converter.serializer.type": "avro",
          "value.converter.data.serializer.type": "avro",
          "value.converter.avro.apicurio.registry.url": "http://schema-registry:8080",
          "value.converter.avro.apicurio.registry.auto-register": true,
          "value.converter.avro.apicurio.registry.find-latest": true,
          "value.converter.metadata.source": "header",
          "value.converter.extension.attributes.enable": false,
          "header.converter": "org.apache.kafka.connect.json.JsonConverter",
          "header.converter.schemas.enable": true
        }
      } 

      (SR is Apicurio)

      and is consumed by:

      {
        "name": "user-db-sink-connector",
        "config": {
          "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
          "tasks.max": "1",
          "topics": "outbox.event.library",
          "connection.url": "jdbc:postgresql://user-db:5432/user",
          "connection.username": "postgres",
          "connection.password": "password",
          "insert.mode": "upsert",
          "primary.key.mode": "record_value",
          "primary.key.fields": "id",
          "table.name.format": "inbox",
          "max.retries": 1,
          "key.converter": "org.apache.kafka.connect.json.JsonConverter",
          "key.converter.schemas.enable": false,
          "value.converter": "io.debezium.converters.CloudEventsConverter",
          "value.converter.serializer.type": "avro",
          "value.converter.data.serializer.type": "avro",
          "value.converter.avro.apicurio.registry.url": "http://schema-registry:8080",
          "header.converter": "org.apache.kafka.connect.json.JsonConverter",
          "header.converter.schemas.enable": true
        }
      } 

      What is the captured database version and mode of depoyment?

      Postgres 16

      Do you have the connector logs, ideally from start till finish?

      kafka-connect         | 2023-10-23 15:59:50,916 ERROR  ||  Failed to process record: Failed to process a sink record   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      kafka-connect         | org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
      kafka-connect         |         at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:87)
      kafka-connect         |         at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:94)
      kafka-connect         |         at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
      kafka-connect         |         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
      kafka-connect         |         at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
      kafka-connect         |         at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
      kafka-connect         |         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
      kafka-connect         |         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
      kafka-connect         |         at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
      kafka-connect         |         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      kafka-connect         |         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      kafka-connect         |         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      kafka-connect         |         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      kafka-connect         |         at java.base/java.lang.Thread.run(Thread.java:829)
      kafka-connect         | Caused by: org.apache.kafka.connect.errors.DataException: after is not a valid field name
      kafka-connect         |         at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
      kafka-connect         |         at org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261)
      kafka-connect         |         at org.apache.kafka.connect.data.Struct.getStruct(Struct.java:191)
      kafka-connect         |         at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.applyRecordValueAsPrimaryKey(SinkRecordDescriptor.java:382)
      kafka-connect         |         at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.readSinkRecordKeyData(SinkRecordDescriptor.java:315)
      kafka-connect         |         at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:285)
      kafka-connect         |         at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:70)
      kafka-connect         |         ... 13 more
      

      I think the message structure in Kafka UI won't be useful because of avro format, but the following is the structure if I change both serialization formats in the source connector to "json":

      {
          "id": "dbea9071-fadb-4ef3-8959-d9753eb6674d",
          "source": "/debezium/postgresql/book",
          "specversion": "1.0",
          "type": "BookCreated",
          "time": "2023-10-23T16:09:36.018Z",
          "datacontenttype": "application/json",
          "data": {
              "schema": {
                  "type": "struct",
                  "fields": [
                      {
                          "type": "int32",
                          "optional": true,
                          "field": "id"
                      },
                      {
                          "type": "string",
                          "optional": true,
                          "field": "name"
                      },
                      {
                          "type": "int32",
                          "optional": true,
                          "field": "publicationYear"
                      }
                  ],
                  "optional": true,
                  "name": "payload"
              },
              "payload": {
                  "id": 4,
                  "name": "name",
                  "publicationYear": 1824
              }
          }
      } 

      JSON use case

      When using json serialization, the sink connector throws NPE due to lack of the schema:

      kafka-connect         | 2023-10-23 16:10:36,582 ERROR  ||  WorkerSinkTask{id=user-db-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure   [org.apache.kafka.connect.runtime.WorkerSinkTask]
      kafka-connect         | org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
      kafka-connect         |         at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:83)
      kafka-connect         |         at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
      kafka-connect         |         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
      kafka-connect         |         at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
      kafka-connect         |         at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
      kafka-connect         |         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
      kafka-connect         |         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
      kafka-connect         |         at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
      kafka-connect         |         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      kafka-connect         |         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      kafka-connect         |         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      kafka-connect         |         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      kafka-connect         |         at java.base/java.lang.Thread.run(Thread.java:829)
      kafka-connect         | Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
      kafka-connect         |         at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:87)
      kafka-connect         |         at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:94)
      kafka-connect         |         ... 12 more
      kafka-connect         | Caused by: java.lang.NullPointerException
      kafka-connect         |         at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.isFlattened(SinkRecordDescriptor.java:292)
      kafka-connect         |         at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:284)
      kafka-connect         |         at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:70)
      kafka-connect         |         ... 13 more
      

      The NPE is thrown by:

              private boolean isFlattened(SinkRecord record) {
                  return @record.valueSchema().name()@- == null || !record.valueSchema().name().contains("Envelope");
              } 

      The absence of the schema is because of:

      // JSON Cloud Events converter always disables schema.
      // The conversion back thus must be schemaless.
      // If data are in schema/payload envelope they are extracted 

            Unassigned Unassigned
            romankudryashov Roman Kudryashov
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: