Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7554

io.confluent.connect.avro.AvroConverter fails to parse events on tables with multiple columns

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Duplicate
    • Icon: Blocker Blocker
    • None
    • 2.5.1.Final
    • cassandra-connector
    • None
    • False
    • None
    • False
    • Important

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      debezium-connector-cassandra cassandra-4 v2.5.0.Final

      What is the connector configuration?

      commit.log.relocation.dir=/cdc/commit_log_relocation
      commit.log.real.time.processing.enabled=true # also tried with false
      http.port=8001

      cassandra.config=/opt/cassandra/conf/cassandra.yaml
      cassandra.hosts=cassandra0:9042
      cassandra.port=9042

      kafka.producer.bootstrap.servers=kafka-broker:9092
      kafka.producer.retries=3
      kafka.producer.retry.backoff.ms=1000
      topic.prefix=test_connector

      key.converter=io.confluent.connect.avro.AvroConverter
      key.converter.schema.registry.url=http://schema-registry:8081
      value.converter=io.confluent.connect.avro.AvroConverter
      value.converter.schema.registry.url=http://schema-registry:8081

      offset.backing.store.dir=/cdc/offset

      snapshot.consistency=ONE

      What is the captured database version and mode of depoyment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      versions tried 4.0.9

      test deployment in a single machine through docker compose.

      What behaviour do you expect?

      I expect debezium to correctly export data for tables with multiple columns of different types in Avro Format.

      What behaviour do you see?

      Events are not being produced, debezium creates schema properly but fails to map the event to the produced schema. 

       

      Do you see the same behaviour using the latest relesead Debezium version?

      Yes

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      Log showing the issue:

       

      combooking-debezium0-1  | 13:04:57.758 [pool-4-thread-3] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService - Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"io.debezium.connector.cassandra.test_connector.test_ks.table_cdc\",\"field
      s\":[{\"name\":\"k\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.cassandra.test_connector.test_ks.table_cdc.Key\"}"} to http://schema-registry:8081/subjects/test_connector.test_ks.table_cdc-key/versions?normalize=false
      combooking-debezium0-1  | 13:04:58.101 [pool-4-thread-3] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService - Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"io.debezium.connector.cassandra.test_connector.test_ks.table_cdc\",\"
      fields\":[{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"source\",\"namespace\":\"\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\"
      :\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\
      "},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"cluster\",\"type\":\"string\"},{\"name\":\"file\",\"type\":\"string\"},{\"name\":\"pos\",\"type\":\"int\"},{\"name\":\"keyspace\
      ",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"}],\"connect.name\":\"source\"}},{\"name\":\"after\",\"type\":{\"type\":\"record\",\"name\":\"after\",\"namespace\":\"\",\"fields\":[{\"name\":\"k\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"cell_value\",\"fields\":
      [{\"name\":\"value\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"deletion_ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"set\",\"type\":\"boolean\"}],\"connect.version\":1,\"connect.name\":\"cell_value\"}],\"default\":null},{\"name\":\"i\",\"type\":[\"
      null\",\"cell_value\"],\"default\":null},{\"name\":\"t\",\"type\":[\"null\",\"cell_value\"],\"default\":null},{\"name\":\"_range_start\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"_range_start\",\"fields\":[{\"name\":\"method\",\"type\":\"string\"},{\"name\":\"values\",\"type\"
      :{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"clustering_value\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"},{\"name\":\"type\",\"type\":\"string\"}],\"connect.version\":1,\"connect.name\":\"clustering_value\"},\"connect
      .version\":1,\"connect.name\":\"clustering_values\"}}],\"connect.version\":1,\"connect.name\":\"_range_start\"}],\"default\":null},{\"name\":\"_range_end\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"_range_end\",\"fields\":[{\"name\":\"method\",\"type\":\"string\"},{\"name\":\"
      values\",\"type\":{\"type\":\"array\",\"items\":\"clustering_value\",\"connect.version\":1,\"connect.name\":\"clustering_values\"}}],\"connect.version\":1,\"connect.name\":\"_range_end\"}],\"default\":null}],\"connect.version\":1,\"connect.name\":\"after\"}}],\"connect.name\":\"io.debe
      zium.connector.cassandra.test_connector.test_ks.table_cdc.Envelope\"}"} to http://schema-registry:8081/subjects/test_connector.test_ks.table_cdc-value/versions?normalize=false
      combooking-debezium0-1  | 13:04:58.142 [pool-4-thread-3] ERROR io.debezium.connector.cassandra.QueueProcessor - Processing of event Record{source={cluster=Test Cluster, keyspace=test_ks, file=CommitLog-7-1708602190887.log, connector=cassandra, pos=251221, ts_micro=1708603487453066, ver
      sion=2.5.0-SNAPSHOT, snapshot=false, table=table_cdc}, after={k={name=k, value=1, deletionTs=null, type=PARTITION}, i={name=i, value=1, deletionTs=null, type=REGULAR}, t={name=t, value=1, deletionTs=null, type=REGULAR}}, keySchema=Schema{io.debezium.connector.cassandra.test_connector.t
      est_ks.table_cdc.Key:STRUCT}, valueSchema=Schema{io.debezium.connector.cassandra.test_connector.test_ks.table_cdc.Envelope:STRUCT}, op=i, ts=1708603497705} was errorneous: {}
      combooking-debezium0-1  | io.debezium.DebeziumException: Failed to send record Record{source={cluster=Test Cluster, keyspace=test_ks, file=CommitLog-7-1708602190887.log, connector=cassandra, pos=251221, ts_micro=1708603487453066, version=2.5.0-SNAPSHOT, snapshot=false, table=table_cdc}
      , after={k={name=k, value=1, deletionTs=null, type=PARTITION}, i={name=i, value=1, deletionTs=null, type=REGULAR}, t={name=t, value=1, deletionTs=null, type=REGULAR}}, keySchema=Schema{io.debezium.connector.cassandra.test_connector.test_ks.table_cdc.Key:STRUCT}, valueSchema=Schema{io.d
      ebezium.connector.cassandra.test_connector.test_ks.table_cdc.Envelope:STRUCT}, op=i, ts=1708603497705}
      combooking-debezium0-1  |       at io.debezium.connector.cassandra.KafkaRecordEmitter.emit(KafkaRecordEmitter.java:73)
      combooking-debezium0-1  |       at io.debezium.connector.cassandra.QueueProcessor.processEvent(QueueProcessor.java:93)
      combooking-debezium0-1  |       at io.debezium.connector.cassandra.QueueProcessor.process(QueueProcessor.java:51)
      combooking-debezium0-1  |       at io.debezium.connector.cassandra.AbstractProcessor.start(AbstractProcessor.java:63)
      combooking-debezium0-1  |       at io.debezium.connector.cassandra.CassandraConnectorTaskTemplate$ProcessorGroup.lambda$start$0(CassandraConnectorTaskTemplate.java:303)
      combooking-debezium0-1  |       at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
      combooking-debezium0-1  |       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      combooking-debezium0-1  |       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
      combooking-debezium0-1  |       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      combooking-debezium0-1  |       at java.base/java.lang.Thread.run(Thread.java:840)
      combooking-debezium0-1  | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic test_connector.test_ks.table_cdc :
      combooking-debezium0-1  |       at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
      combooking-debezium0-1  |       at io.debezium.connector.cassandra.KafkaRecordEmitter.toProducerRecord(KafkaRecordEmitter.java:83)
      combooking-debezium0-1  |       at io.debezium.connector.cassandra.KafkaRecordEmitter.emit(KafkaRecordEmitter.java:64)
      combooking-debezium0-1  |       ... 9 common frames omitted
      combooking-debezium0-1  | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
      combooking-debezium0-1  |       at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:154)
      combooking-debezium0-1  |       at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
      combooking-debezium0-1  |       at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
      combooking-debezium0-1  |       ... 11 common frames omitted
      combooking-debezium0-1  | Caused by: org.apache.avro.UnresolvedUnionException: Not in union {"type":"record","name":"after","fields":[{"name":"k","type":["null",{"type":"record","name":"cell_value","fields":[{"name":"value","type":["null","string"],"default":null},{"name":"deletion_ts"
      ,"type":["null","long"],"default":null},{"name":"set","type":"boolean"}],"connect.version":1,"connect.name":"cell_value"}],"default":null},{"name":"i","type":["null","cell_value"],"default":null},{"name":"t","type":["null","cell_value"],"default":null},{"name":"_range_start","type":["n
      ull",{"type":"record","name":"_range_start","fields":[{"name":"method","type":"string"},{"name":"values","type":{"type":"array","items":{"type":"record","name":"clustering_value","fields":[{"name":"name","type":"string"},{"name":"value","type":"string"},{"name":"type","type":"string"}]
      ,"connect.version":1,"connect.name":"clustering_value"},"connect.version":1,"connect.name":"clustering_values"}}],"connect.version":1,"connect.name":"_range_start"}],"default":null},{"name":"_range_end","type":["null",{"type":"record","name":"_range_end","fields":[{"name":"method","typ
      e":"string"},{"name":"values","type":{"type":"array","items":"clustering_value","connect.version":1,"connect.name":"clustering_values"}}],"connect.version":1,"connect.name":"_range_end"}],"default":null}],"connect.version":1,"connect.name":"after"}: {"k": {"value": "1", "deletion_ts":
      null, "set": true}, "i": {"value": 1, "deletion_ts": null, "set": true}, "t": {"value": "1", "deletion_ts": null, "set": true}, "_range_start": null, "_range_end": null} (field=after)
      combooking-debezium0-1  |       at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:223)
      combooking-debezium0-1  |       at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
      combooking-debezium0-1  |       at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
      combooking-debezium0-1  |       at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
      combooking-debezium0-1  |       at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
      combooking-debezium0-1  |       at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:168)
      combooking-debezium0-1  |       at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:146)
      combooking-debezium0-1  |       ... 13 common frames omitted
      combooking-debezium0-1  |       Suppressed: org.apache.avro.UnresolvedUnionException: Not in union ["null",{"type":"record","name":"cell_value","fields":[{"name":"value","type":["null","string"],"default":null},{"name":"deletion_ts","type":["null","long"],"default":null},{"name":"set",
      "type":"boolean"}],"connect.version":1,"connect.name":"cell_value"}]: {"value": 1, "deletion_ts": null, "set": true} (field=i)
      combooking-debezium0-1  |               at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:223)
      combooking-debezium0-1  |               at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
      combooking-debezium0-1  |               at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
      combooking-debezium0-1  |               at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83)
      combooking-debezium0-1  |               at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:221)
      combooking-debezium0-1  |               ... 19 common frames omitted
      combooking-debezium0-1  |               Suppressed: org.apache.avro.UnresolvedUnionException: Not in union ["null","string"]: 1 (field=value)
      combooking-debezium0-1  |                       at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:223)
      combooking-debezium0-1  |                       at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210)
      combooking-debezium0-1  |                       at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) 

       

       

      How to reproduce the issue using our tutorial deployment?

       - Set value converter to io.confluent.connect.avro.AvroConverter

       - Create a table with multiple types of columns example:

      CREATE TABLE IF NOT EXISTS test_ks.table_cdc (k text PRIMARY KEY, t text, i int) WITH cdc=true; 
      • produce some events

              Unassigned Unassigned
              schampilomatis Stavros Champilomatis (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              1 Start watching this issue

                Created:
                Updated:
                Resolved: