-
Bug
-
Resolution: Duplicate
-
Blocker
-
None
-
2.5.1.Final
-
None
-
False
-
None
-
False
-
Important
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
debezium-connector-cassandra cassandra-4 v2.5.0.Final
What is the connector configuration?
commit.log.relocation.dir=/cdc/commit_log_relocation
commit.log.real.time.processing.enabled=true # also tried with false
http.port=8001
cassandra.config=/opt/cassandra/conf/cassandra.yaml
cassandra.hosts=cassandra0:9042
cassandra.port=9042
kafka.producer.bootstrap.servers=kafka-broker:9092
kafka.producer.retries=3
kafka.producer.retry.backoff.ms=1000
topic.prefix=test_connector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
offset.backing.store.dir=/cdc/offset
snapshot.consistency=ONE
What is the captured database version and mode of depoyment?
(E.g. on-premises, with a specific cloud provider, etc.)
versions tried 4.0.9
test deployment in a single machine through docker compose.
What behaviour do you expect?
I expect debezium to correctly export data for tables with multiple columns of different types in Avro Format.
What behaviour do you see?
Events are not being produced, debezium creates schema properly but fails to map the event to the produced schema.
Do you see the same behaviour using the latest relesead Debezium version?
Yes
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
Log showing the issue:
combooking-debezium0-1 | 13:04:57.758 [pool-4-thread-3] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService - Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"io.debezium.connector.cassandra.test_connector.test_ks.table_cdc\",\"field s\":[{\"name\":\"k\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.cassandra.test_connector.test_ks.table_cdc.Key\"}"} to http://schema-registry:8081/subjects/test_connector.test_ks.table_cdc-key/versions?normalize=false combooking-debezium0-1 | 13:04:58.101 [pool-4-thread-3] DEBUG io.confluent.kafka.schemaregistry.client.rest.RestService - Sending POST with input {"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"io.debezium.connector.cassandra.test_connector.test_ks.table_cdc\",\" fields\":[{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"source\",\"namespace\":\"\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\" :\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false,incremental\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\ "},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"sequence\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"cluster\",\"type\":\"string\"},{\"name\":\"file\",\"type\":\"string\"},{\"name\":\"pos\",\"type\":\"int\"},{\"name\":\"keyspace\ ",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"}],\"connect.name\":\"source\"}},{\"name\":\"after\",\"type\":{\"type\":\"record\",\"name\":\"after\",\"namespace\":\"\",\"fields\":[{\"name\":\"k\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"cell_value\",\"fields\": [{\"name\":\"value\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"deletion_ts\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"set\",\"type\":\"boolean\"}],\"connect.version\":1,\"connect.name\":\"cell_value\"}],\"default\":null},{\"name\":\"i\",\"type\":[\" null\",\"cell_value\"],\"default\":null},{\"name\":\"t\",\"type\":[\"null\",\"cell_value\"],\"default\":null},{\"name\":\"_range_start\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"_range_start\",\"fields\":[{\"name\":\"method\",\"type\":\"string\"},{\"name\":\"values\",\"type\" :{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"clustering_value\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"value\",\"type\":\"string\"},{\"name\":\"type\",\"type\":\"string\"}],\"connect.version\":1,\"connect.name\":\"clustering_value\"},\"connect .version\":1,\"connect.name\":\"clustering_values\"}}],\"connect.version\":1,\"connect.name\":\"_range_start\"}],\"default\":null},{\"name\":\"_range_end\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"_range_end\",\"fields\":[{\"name\":\"method\",\"type\":\"string\"},{\"name\":\" values\",\"type\":{\"type\":\"array\",\"items\":\"clustering_value\",\"connect.version\":1,\"connect.name\":\"clustering_values\"}}],\"connect.version\":1,\"connect.name\":\"_range_end\"}],\"default\":null}],\"connect.version\":1,\"connect.name\":\"after\"}}],\"connect.name\":\"io.debe zium.connector.cassandra.test_connector.test_ks.table_cdc.Envelope\"}"} to http://schema-registry:8081/subjects/test_connector.test_ks.table_cdc-value/versions?normalize=false combooking-debezium0-1 | 13:04:58.142 [pool-4-thread-3] ERROR io.debezium.connector.cassandra.QueueProcessor - Processing of event Record{source={cluster=Test Cluster, keyspace=test_ks, file=CommitLog-7-1708602190887.log, connector=cassandra, pos=251221, ts_micro=1708603487453066, ver sion=2.5.0-SNAPSHOT, snapshot=false, table=table_cdc}, after={k={name=k, value=1, deletionTs=null, type=PARTITION}, i={name=i, value=1, deletionTs=null, type=REGULAR}, t={name=t, value=1, deletionTs=null, type=REGULAR}}, keySchema=Schema{io.debezium.connector.cassandra.test_connector.t est_ks.table_cdc.Key:STRUCT}, valueSchema=Schema{io.debezium.connector.cassandra.test_connector.test_ks.table_cdc.Envelope:STRUCT}, op=i, ts=1708603497705} was errorneous: {} combooking-debezium0-1 | io.debezium.DebeziumException: Failed to send record Record{source={cluster=Test Cluster, keyspace=test_ks, file=CommitLog-7-1708602190887.log, connector=cassandra, pos=251221, ts_micro=1708603487453066, version=2.5.0-SNAPSHOT, snapshot=false, table=table_cdc} , after={k={name=k, value=1, deletionTs=null, type=PARTITION}, i={name=i, value=1, deletionTs=null, type=REGULAR}, t={name=t, value=1, deletionTs=null, type=REGULAR}}, keySchema=Schema{io.debezium.connector.cassandra.test_connector.test_ks.table_cdc.Key:STRUCT}, valueSchema=Schema{io.d ebezium.connector.cassandra.test_connector.test_ks.table_cdc.Envelope:STRUCT}, op=i, ts=1708603497705} combooking-debezium0-1 | at io.debezium.connector.cassandra.KafkaRecordEmitter.emit(KafkaRecordEmitter.java:73) combooking-debezium0-1 | at io.debezium.connector.cassandra.QueueProcessor.processEvent(QueueProcessor.java:93) combooking-debezium0-1 | at io.debezium.connector.cassandra.QueueProcessor.process(QueueProcessor.java:51) combooking-debezium0-1 | at io.debezium.connector.cassandra.AbstractProcessor.start(AbstractProcessor.java:63) combooking-debezium0-1 | at io.debezium.connector.cassandra.CassandraConnectorTaskTemplate$ProcessorGroup.lambda$start$0(CassandraConnectorTaskTemplate.java:303) combooking-debezium0-1 | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) combooking-debezium0-1 | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) combooking-debezium0-1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) combooking-debezium0-1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) combooking-debezium0-1 | at java.base/java.lang.Thread.run(Thread.java:840) combooking-debezium0-1 | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic test_connector.test_ks.table_cdc : combooking-debezium0-1 | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93) combooking-debezium0-1 | at io.debezium.connector.cassandra.KafkaRecordEmitter.toProducerRecord(KafkaRecordEmitter.java:83) combooking-debezium0-1 | at io.debezium.connector.cassandra.KafkaRecordEmitter.emit(KafkaRecordEmitter.java:64) combooking-debezium0-1 | ... 9 common frames omitted combooking-debezium0-1 | Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message combooking-debezium0-1 | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:154) combooking-debezium0-1 | at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153) combooking-debezium0-1 | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86) combooking-debezium0-1 | ... 11 common frames omitted combooking-debezium0-1 | Caused by: org.apache.avro.UnresolvedUnionException: Not in union {"type":"record","name":"after","fields":[{"name":"k","type":["null",{"type":"record","name":"cell_value","fields":[{"name":"value","type":["null","string"],"default":null},{"name":"deletion_ts" ,"type":["null","long"],"default":null},{"name":"set","type":"boolean"}],"connect.version":1,"connect.name":"cell_value"}],"default":null},{"name":"i","type":["null","cell_value"],"default":null},{"name":"t","type":["null","cell_value"],"default":null},{"name":"_range_start","type":["n ull",{"type":"record","name":"_range_start","fields":[{"name":"method","type":"string"},{"name":"values","type":{"type":"array","items":{"type":"record","name":"clustering_value","fields":[{"name":"name","type":"string"},{"name":"value","type":"string"},{"name":"type","type":"string"}] ,"connect.version":1,"connect.name":"clustering_value"},"connect.version":1,"connect.name":"clustering_values"}}],"connect.version":1,"connect.name":"_range_start"}],"default":null},{"name":"_range_end","type":["null",{"type":"record","name":"_range_end","fields":[{"name":"method","typ e":"string"},{"name":"values","type":{"type":"array","items":"clustering_value","connect.version":1,"connect.name":"clustering_values"}}],"connect.version":1,"connect.name":"_range_end"}],"default":null}],"connect.version":1,"connect.name":"after"}: {"k": {"value": "1", "deletion_ts": null, "set": true}, "i": {"value": 1, "deletion_ts": null, "set": true}, "t": {"value": "1", "deletion_ts": null, "set": true}, "_range_start": null, "_range_end": null} (field=after) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:223) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73) combooking-debezium0-1 | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:168) combooking-debezium0-1 | at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:146) combooking-debezium0-1 | ... 13 common frames omitted combooking-debezium0-1 | Suppressed: org.apache.avro.UnresolvedUnionException: Not in union ["null",{"type":"record","name":"cell_value","fields":[{"name":"value","type":["null","string"],"default":null},{"name":"deletion_ts","type":["null","long"],"default":null},{"name":"set", "type":"boolean"}],"connect.version":1,"connect.name":"cell_value"}]: {"value": 1, "deletion_ts": null, "set": true} (field=i) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:223) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:221) combooking-debezium0-1 | ... 19 common frames omitted combooking-debezium0-1 | Suppressed: org.apache.avro.UnresolvedUnionException: Not in union ["null","string"]: 1 (field=value) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:223) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:210) combooking-debezium0-1 | at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
How to reproduce the issue using our tutorial deployment?
- Set value converter to io.confluent.connect.avro.AvroConverter
- Create a table with multiple types of columns example:
CREATE TABLE IF NOT EXISTS test_ks.table_cdc (k text PRIMARY KEY, t text, i int) WITH cdc=true;
- produce some events
- duplicates
-
DBZ-7484 Cassandra-4/3: Data publish with AvroConverter fails with incorrect schema
- Pull Request Sent