-
Bug
-
Resolution: Done
-
Major
-
None
-
None
-
False
-
None
-
False
What Debezium connector do you use and what version?
debezium-connector-cassandra-2.0.0.Final
What is the connector configuration?
debezium-connector-cassandra.properties
connector.name=cassandra-connector commit.log.relocation.dir=/bitnami/cassandra/data/debezium-connector-cassandra/relocation/ http.port=8000 cassandra.config=/app/conf/cassandra.yaml cassandra.driver.config.file=/app/conf/datastax-driver.conf kafka.producer.bootstrap.servers=kafka:9092 kafka.producer.retries=3 kafka.producer.retry.backoff.ms=1000 topic.prefix=debezium.json key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter offset.backing.store.dir=/bitnami/cassandra/data/debezium-connector-cassandra/offsets/ snapshot.consistency=ALL snapshot.mode=INITIAL latest.commit.log.only=true commit.log.real.time.processing.enabled=true
datastax-driver.conf
datastax-java-driver {
basic.contact-points = [ "cassandra:9042"]
basic.load-balancing-policy {
local-datacenter = datacenter1
}
advanced.auth-provider {
class = PlainTextAuthProvider
username = cassandra
password = cassandra
}
}
What is the captured database version and mode of depoyment?
Apache Cassandra 4.0.7 deployed using docker image: bitnami/cassandra:4.0.7
What behaviour do you expect?
Cassandra columns containing decimal values should be deserialized and captured via CDC.
What behaviour do you see?
No rows with Cassandra decimal values are captured. The exception is pasted below.
Do you see the same behaviour using the latest released Debezium version?
Yes. Compiling the debezium-cassandra-connector from the tip of master shows the same behaviour.
Do you have the connector logs, ideally from start till finish?
[pool-4-thread-1] ERROR io.debezium.connector.cassandra.QueueProcessor - Processing of event Record{source={cluster=My Cluster, keyspace=mykeyspace, file=CommitLog-7-1667834128263.log, connector=cassandra, pos=1475342, ts_micro=1667834592183744, version=2.0.0.Final, snapshot=false, table=mytable}, after={k={name=k, value=a, deletionTs=null, type=PARTITION}, v={name=v, value=1, deletionTs=null, type=REGULAR}}, keySchema=Schema{io.debezium.connector.cassandra.debezium.json.mykeyspace.mytable.Key:STRUCT}, valueSchema=Schema{io.debezium.connector.cassandra.debezium.json.mykeyspace.mytable.Envelope:STRUCT}, op=i, ts=1667834608588} was errorneous: {} io.debezium.DebeziumException: Failed to send record Record{source={cluster=My Cluster, keyspace=mykeyspace, file=CommitLog-7-1667834128263.log, connector=cassandra, pos=1475342, ts_micro=1667834592183744, version=2.0.0.Final, snapshot=false, table=mytable}, after={k={name=k, value=a, deletionTs=null, type=PARTITION}, v={name=v, value=1, deletionTs=null, type=REGULAR}}, keySchema=Schema{io.debezium.connector.cassandra.debezium.json.mykeyspace.mytable.Key:STRUCT}, valueSchema=Schema{io.debezium.connector.cassandra.debezium.json.mykeyspace.mytable.Envelope:STRUCT}, op=i, ts=1667834608588} at io.debezium.connector.cassandra.KafkaRecordEmitter.emit(KafkaRecordEmitter.java:72) at io.debezium.connector.cassandra.QueueProcessor.processEvent(QueueProcessor.java:113) at io.debezium.connector.cassandra.QueueProcessor.process(QueueProcessor.java:71) at io.debezium.connector.cassandra.AbstractProcessor.start(AbstractProcessor.java:63) at io.debezium.connector.cassandra.CassandraConnectorTaskTemplate$ProcessorGroup.lambda$start$0(CassandraConnectorTaskTemplate.java:238) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833) Caused by: io.debezium.DebeziumException: Failed to record Cell. Name: v, Schema: Schema{v:STRUCT}, Value: 1 at io.debezium.connector.cassandra.CellData.record(CellData.java:74) at io.debezium.connector.cassandra.RowData.record(RowData.java:85) at io.debezium.connector.cassandra.Record.buildValue(Record.java:86) at io.debezium.connector.cassandra.KafkaRecordEmitter.toProducerRecord(KafkaRecordEmitter.java:82) at io.debezium.connector.cassandra.KafkaRecordEmitter.emit(KafkaRecordEmitter.java:63) ... 9 common frames omitted Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type FLOAT64: class java.math.BigDecimal for field: "value" at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242) at org.apache.kafka.connect.data.Struct.put(Struct.java:216) at org.apache.kafka.connect.data.Struct.put(Struct.java:203) at io.debezium.connector.cassandra.CellData.record(CellData.java:71) ... 13 common frames omitted
How to reproduce the issue using our tutorial deployment?
create docker network
docker network create debezium
use docker-compose.yml file to assemble services
version: '3.2' services: zookeeper: image: bitnami/zookeeper:3.8.0 container_name: "zookeeper" volumes: - type: "volume" source: zookeeper-data target: /bitnami/zookeeper networks: - debezium environment: ZOO_SERVER_ID: 1 ALLOW_ANONYMOUS_LOGIN: "yes" ports: - 2181:2181 kafka: image: bitnami/kafka:3.3.1 container_name: "kafka" depends_on: - zookeeper volumes: - type: volume source: kafka-data target: /bitnami/kafka networks: - debezium ports: - 9092:9092 environment: KAFKA_BROKER_ID: 1 KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181" ALLOW_PLAINTEXT_LISTENER: "yes" cassandra: image: bitnami/cassandra:4.0.7 container_name: "cassandra" volumes: - type: bind source: /path/to/cassandra/cassandra.yaml target: /bitnami/cassandra/conf/cassandra.yaml - type: volume source: cassandra-data target: /bitnami networks: - debezium ports: - 9042:9042 environment: CASSANDRA_HOST: "cassandra" cassandra-debezium-connector: image: bitnami/java:17.0.5-8 container_name: "cassandra-debezium-connector" restart: on-failure volumes: - type: bind source: /path/to/connector.properties target: /app/conf/connector.properties - type: bind source: /path/to/datastax-driver.conf target: /app/conf/datastax-driver.conf - type: bind source: /path/to/cassandra.yaml target: /app/conf/cassandra.yaml - type: bind source: /path/to/debezium-connector-cassandra-4-2.0.0.Final-jar-with-dependencies.jar target: /app/debezium-connector-cassandra.jar - type: volume source: cassandra-data target: /bitnami networks: - debezium command: ["java", "--add-exports", "java.base/jdk.internal.misc=ALL-UNNAMED", "--add-exports", "java.base/jdk.internal.ref=ALL-UNNAMED", "--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED", "--add-exports", "java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED", "--add-exports", "java.rmi/sun.rmi.registry=ALL-UNNAMED", "--add-exports", "java.rmi/sun.rmi.server=ALL-UNNAMED", "--add-exports", "java.sql/java.sql=ALL-UNNAMED", "--add-opens", "java.base/java.lang.module=ALL-UNNAMED", "--add-opens", "java.base/jdk.internal.loader=ALL-UNNAMED", "--add-opens", "java.base/jdk.internal.ref=ALL-UNNAMED", "--add-opens", "java.base/jdk.internal.reflect=ALL-UNNAMED", "--add-opens", "java.base/jdk.internal.math=ALL-UNNAMED", "--add-opens", "java.base/jdk.internal.module=ALL-UNNAMED", "--add-opens", "java.base/jdk.internal.util.jar=ALL-UNNAMED", "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED", "--add-opens", "jdk.management/com.sun.management.internal=ALL-UNNAMED", "--add-opens", "java.base/java.io=ALL-UNNAMED", "-jar", "/app/debezium-connector-cassandra.jar", "/app/conf/connector.properties"] networks: arm: external: true volumes: zookeeper-data: kafka-data: cassandra-data:
run cqlsh to create table & populate records
CREATE KEYSPACE mykeyspace with replication = {'class':'SimpleStrategy','replication_factor':1} ; CREATE TABLE mykeyspace.mytable ( k text, v decimal, PRIMARY KEY (k) ) WITH cdc=true; INSERT INTO mykeyspace.mytable ( k, v ) VALUES ( 'a', 1 );