Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5807

Cassandra decimal values are not deserialized using Debezium Cassandra Connector

XMLWordPrintable

      What Debezium connector do you use and what version?

      debezium-connector-cassandra-2.0.0.Final

      What is the connector configuration?

      debezium-connector-cassandra.properties

       

      connector.name=cassandra-connector
      commit.log.relocation.dir=/bitnami/cassandra/data/debezium-connector-cassandra/relocation/
      http.port=8000
      cassandra.config=/app/conf/cassandra.yaml
      cassandra.driver.config.file=/app/conf/datastax-driver.conf
      kafka.producer.bootstrap.servers=kafka:9092
      kafka.producer.retries=3
      kafka.producer.retry.backoff.ms=1000
      topic.prefix=debezium.json
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      offset.backing.store.dir=/bitnami/cassandra/data/debezium-connector-cassandra/offsets/
      snapshot.consistency=ALL
      snapshot.mode=INITIAL
      latest.commit.log.only=true
      commit.log.real.time.processing.enabled=true
      

       

      datastax-driver.conf

       

      datastax-java-driver {
          basic.contact-points = [ "cassandra:9042"]
          basic.load-balancing-policy {
              local-datacenter = datacenter1
          }
          advanced.auth-provider {
              class = PlainTextAuthProvider
              username = cassandra
              password = cassandra
          }
      }
      

       

      What is the captured database version and mode of depoyment?

      Apache Cassandra 4.0.7 deployed using docker image: bitnami/cassandra:4.0.7

      What behaviour do you expect?

      Cassandra columns containing decimal values should be deserialized and captured via CDC.

      What behaviour do you see?

      No rows with Cassandra decimal values are captured. The exception is pasted below.

      Do you see the same behaviour using the latest released Debezium version?

      Yes. Compiling the debezium-cassandra-connector from the tip of master shows the same behaviour.

      Do you have the connector logs, ideally from start till finish?

      [pool-4-thread-1] ERROR io.debezium.connector.cassandra.QueueProcessor - Processing of event Record{source={cluster=My Cluster, keyspace=mykeyspace, file=CommitLog-7-1667834128263.log, connector=cassandra, pos=1475342, ts_micro=1667834592183744, version=2.0.0.Final, snapshot=false, table=mytable}, after={k={name=k, value=a, deletionTs=null, type=PARTITION}, v={name=v, value=1, deletionTs=null, type=REGULAR}}, keySchema=Schema{io.debezium.connector.cassandra.debezium.json.mykeyspace.mytable.Key:STRUCT}, valueSchema=Schema{io.debezium.connector.cassandra.debezium.json.mykeyspace.mytable.Envelope:STRUCT}, op=i, ts=1667834608588} was errorneous: {}
      io.debezium.DebeziumException: Failed to send record Record{source={cluster=My Cluster, keyspace=mykeyspace, file=CommitLog-7-1667834128263.log, connector=cassandra, pos=1475342, ts_micro=1667834592183744, version=2.0.0.Final, snapshot=false, table=mytable}, after={k={name=k, value=a, deletionTs=null, type=PARTITION}, v={name=v, value=1, deletionTs=null, type=REGULAR}}, keySchema=Schema{io.debezium.connector.cassandra.debezium.json.mykeyspace.mytable.Key:STRUCT}, valueSchema=Schema{io.debezium.connector.cassandra.debezium.json.mykeyspace.mytable.Envelope:STRUCT}, op=i, ts=1667834608588}
        at io.debezium.connector.cassandra.KafkaRecordEmitter.emit(KafkaRecordEmitter.java:72)
        at io.debezium.connector.cassandra.QueueProcessor.processEvent(QueueProcessor.java:113)
        at io.debezium.connector.cassandra.QueueProcessor.process(QueueProcessor.java:71)
        at io.debezium.connector.cassandra.AbstractProcessor.start(AbstractProcessor.java:63)
        at io.debezium.connector.cassandra.CassandraConnectorTaskTemplate$ProcessorGroup.lambda$start$0(CassandraConnectorTaskTemplate.java:238)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)
      Caused by: io.debezium.DebeziumException: Failed to record Cell. Name: v, Schema: Schema{v:STRUCT}, Value: 1
        at io.debezium.connector.cassandra.CellData.record(CellData.java:74)
        at io.debezium.connector.cassandra.RowData.record(RowData.java:85)
        at io.debezium.connector.cassandra.Record.buildValue(Record.java:86)
        at io.debezium.connector.cassandra.KafkaRecordEmitter.toProducerRecord(KafkaRecordEmitter.java:82)
        at io.debezium.connector.cassandra.KafkaRecordEmitter.emit(KafkaRecordEmitter.java:63)
        ... 9 common frames omitted
      Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type FLOAT64: class java.math.BigDecimal for field: "value"
        at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)
        at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
        at org.apache.kafka.connect.data.Struct.put(Struct.java:203)
        at io.debezium.connector.cassandra.CellData.record(CellData.java:71)
        ... 13 common frames omitted 

      How to reproduce the issue using our tutorial deployment?

      create docker network

      docker network create debezium
      

      use docker-compose.yml file to assemble services

      version: '3.2'
      
      services:
      
        zookeeper:
          image: bitnami/zookeeper:3.8.0
          container_name: "zookeeper" 
          volumes:
            - type: "volume"
              source: zookeeper-data
              target: /bitnami/zookeeper
          networks:
            - debezium
          environment:
            ZOO_SERVER_ID: 1
            ALLOW_ANONYMOUS_LOGIN: "yes"
          ports:
            - 2181:2181
      
        kafka:
          image: bitnami/kafka:3.3.1
          container_name: "kafka" 
          depends_on:
            - zookeeper
          volumes:
            - type: volume
              source: kafka-data
              target: /bitnami/kafka
          networks:
            - debezium
          ports:
            - 9092:9092
          environment:
            KAFKA_BROKER_ID: 1
            KAFKA_CFG_ZOOKEEPER_CONNECT: "zookeeper:2181"
            ALLOW_PLAINTEXT_LISTENER: "yes"
      
        cassandra:
          image: bitnami/cassandra:4.0.7
          container_name: "cassandra" 
          volumes:
            - type: bind
              source: /path/to/cassandra/cassandra.yaml
              target: /bitnami/cassandra/conf/cassandra.yaml
            - type: volume
              source: cassandra-data
              target: /bitnami
          networks:
            - debezium
          ports:
            - 9042:9042
          environment:
            CASSANDRA_HOST: "cassandra"
      
        cassandra-debezium-connector:
          image: bitnami/java:17.0.5-8
          container_name: "cassandra-debezium-connector"
          restart: on-failure
          volumes:
            - type: bind
              source: /path/to/connector.properties
              target: /app/conf/connector.properties
            - type: bind
              source: /path/to/datastax-driver.conf
              target: /app/conf/datastax-driver.conf
            - type: bind
              source: /path/to/cassandra.yaml
              target: /app/conf/cassandra.yaml
            - type: bind
              source: /path/to/debezium-connector-cassandra-4-2.0.0.Final-jar-with-dependencies.jar
              target: /app/debezium-connector-cassandra.jar
            - type: volume
              source: cassandra-data
              target: /bitnami
          networks:
            - debezium
          command: ["java", "--add-exports", "java.base/jdk.internal.misc=ALL-UNNAMED", "--add-exports", "java.base/jdk.internal.ref=ALL-UNNAMED", "--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED", "--add-exports", "java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED", "--add-exports", "java.rmi/sun.rmi.registry=ALL-UNNAMED", "--add-exports", "java.rmi/sun.rmi.server=ALL-UNNAMED", "--add-exports", "java.sql/java.sql=ALL-UNNAMED", "--add-opens", "java.base/java.lang.module=ALL-UNNAMED", "--add-opens", "java.base/jdk.internal.loader=ALL-UNNAMED", "--add-opens", "java.base/jdk.internal.ref=ALL-UNNAMED", "--add-opens", "java.base/jdk.internal.reflect=ALL-UNNAMED", "--add-opens", "java.base/jdk.internal.math=ALL-UNNAMED", "--add-opens", "java.base/jdk.internal.module=ALL-UNNAMED", "--add-opens", "java.base/jdk.internal.util.jar=ALL-UNNAMED", "--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED", "--add-opens", "jdk.management/com.sun.management.internal=ALL-UNNAMED", "--add-opens", "java.base/java.io=ALL-UNNAMED", "-jar", "/app/debezium-connector-cassandra.jar", "/app/conf/connector.properties"]
      
      networks:
        arm:
          external: true
        
      volumes:
        zookeeper-data:
        kafka-data:
        cassandra-data:
      
      

      run cqlsh to create table & populate records

      CREATE KEYSPACE mykeyspace with replication =
      {'class':'SimpleStrategy','replication_factor':1}
      ;
      CREATE TABLE mykeyspace.mytable ( k text, v decimal, PRIMARY KEY (k) ) WITH cdc=true;
      INSERT INTO mykeyspace.mytable ( k, v ) VALUES ( 'a', 1 );
      

       

              Unassigned Unassigned
              keriharris Keri Harris (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: