Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5808

Cassandra varint type is currently not supported

    XMLWordPrintable

Details

    Description

      Attempting to use the debezium-cassandra-connector against a Cassandra table with a varint column results in failure to handle a SCHEMA_CHANGE event:

      18:58:50.810 [s0-admin-0] DEBUG com.datastax.oss.driver.internal.core.context.EventBus - [s0] Firing an instance of class com.datastax.oss.driver.internal.core.metadata.schema.events.TableChangeEvent: TableChangeEvent(CREATED test5)
      18:58:50.810 [s0-admin-0] DEBUG com.datastax.oss.driver.internal.core.context.EventBus - [s0] Notifying com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule$$Lambda$455/0x000000080112e2a8@538a2f0e of TableChangeEvent(CREATED test5)
      18:58:50.810 [s0-admin-0] WARN com.datastax.oss.driver.internal.core.control.ControlConnection - [s0] Unexpected error while refreshing schema for a SCHEMA_CHANGE event, keeping previous version
      java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke "io.debezium.connector.cassandra.transforms.type.converter.TypeConverter.convert(com.datastax.oss.driver.api.core.type.DataType)" because "typeConverter" is null
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)
        at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
        at com.datastax.oss.driver.shaded.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at com.datastax.oss.driver.shaded.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at com.datastax.oss.driver.shaded.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)
      Caused by: java.lang.NullPointerException: Cannot invoke "io.debezium.connector.cassandra.transforms.type.converter.TypeConverter.convert(com.datastax.oss.driver.api.core.type.DataType)" because "typeConverter" is null
        at io.debezium.connector.cassandra.transforms.CassandraTypeConverter.convert(CassandraTypeConverter.java:78)
        at io.debezium.connector.cassandra.RowData.rowSchema(RowData.java:123)
        at io.debezium.connector.cassandra.RowData.rowSchema(RowData.java:116)
        at io.debezium.connector.cassandra.AbstractSchemaChangeListener.getKeyValueSchema(AbstractSchemaChangeListener.java:72)
        at io.debezium.connector.cassandra.Cassandra4SchemaChangeListener.onTableCreated(Cassandra4SchemaChangeListener.java:119)
        at com.datastax.oss.driver.internal.core.session.SchemaListenerNotifier.onTableChangeEvent(SchemaListenerNotifier.java:127)
        at com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule.lambda$on$1(RunOrSchedule.java:64)
        at com.datastax.oss.driver.internal.core.context.EventBus.fire(EventBus.java:95)
        at com.datastax.oss.driver.internal.core.metadata.MetadataManager.apply(MetadataManager.java:518)
        at com.datastax.oss.driver.internal.core.metadata.MetadataManager$SingleThreaded.parseAndApplySchemaRows(MetadataManager.java:490)
        at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
        ... 6 common frames omitted
      

       

       

      The VARINT Cassandra protocol type and is notably absent from CassandraTypeConverter, and the IntegerType Cassandra marshalling is absent from the deserializing code. (I believe VARINT maps to IntegerType as it extends NumberType<BigInteger>).

       

       

      I've been using the following to deserialize varint columns into BigInteger byte arrays:

      diff --git a/core/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java b/core/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java
      index e56cf70..9775416 100644
      --- a/core/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java
      +++ b/core/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeConverter.java
      @@ -20,6 +20,7 @@ import org.apache.cassandra.db.marshal.DurationType;
       import org.apache.cassandra.db.marshal.FloatType;
       import org.apache.cassandra.db.marshal.InetAddressType;
       import org.apache.cassandra.db.marshal.Int32Type;
      +import org.apache.cassandra.db.marshal.IntegerType;
       import org.apache.cassandra.db.marshal.LongType;
       import org.apache.cassandra.db.marshal.ShortType;
       import org.apache.cassandra.db.marshal.SimpleDateType;
      @@ -71,6 +72,7 @@ public final class CassandraTypeConverter {
               typeMap.put(ProtocolConstants.DataType.TUPLE, new TupleTypeConverter());
               typeMap.put(ProtocolConstants.DataType.UDT, new UserTypeConverter());
               typeMap.put(ProtocolConstants.DataType.UUID, new BasicTypeConverter<>(UUIDType.instance));
      +        typeMap.put(ProtocolConstants.DataType.VARINT, new BasicTypeConverter<>(IntegerType.instance));
           }
      
           public static AbstractType<?> convert(com.datastax.oss.driver.api.core.type.DataType type) { 
      diff --git a/core/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java b/core/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java
      index c4234b1..b891667 100644
      --- a/core/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java
      +++ b/core/src/main/java/io/debezium/connector/cassandra/transforms/CassandraTypeDeserializer.java
      @@ -34,6 +34,7 @@ import org.apache.cassandra.db.marshal.DurationType;
       import org.apache.cassandra.db.marshal.FloatType;
       import org.apache.cassandra.db.marshal.InetAddressType;
       import org.apache.cassandra.db.marshal.Int32Type;
      +import org.apache.cassandra.db.marshal.IntegerType;
       import org.apache.cassandra.db.marshal.ListType;
       import org.apache.cassandra.db.marshal.LongType;
       import org.apache.cassandra.db.marshal.MapType;
      @@ -59,6 +60,7 @@ import io.debezium.connector.cassandra.transforms.type.deserializer.BasicTypeDes
       import io.debezium.connector.cassandra.transforms.type.deserializer.CollectionTypeDeserializer;
       import io.debezium.connector.cassandra.transforms.type.deserializer.DurationTypeDeserializer;
       import io.debezium.connector.cassandra.transforms.type.deserializer.InetAddressDeserializer;
      +import io.debezium.connector.cassandra.transforms.type.deserializer.IntegerTypeDeserializer;
       import io.debezium.connector.cassandra.transforms.type.deserializer.ListTypeDeserializer;
       import io.debezium.connector.cassandra.transforms.type.deserializer.MapTypeDeserializer;
       import io.debezium.connector.cassandra.transforms.type.deserializer.SetTypeDeserializer;
      @@ -124,6 +126,7 @@ public final class CassandraTypeDeserializer {
               tmp.put(DurationType.class, new DurationTypeDeserializer(deserializer));
               tmp.put(UUIDType.class, new UUIDTypeDeserializer(deserializer));
               tmp.put(TimeUUIDType.class, new TimeUUIDTypeDeserializer(deserializer));
      +        tmp.put(IntegerType.class, new IntegerTypeDeserializer(deserializer));
               // Collection Types
               tmp.put(ListType.class, new ListTypeDeserializer(deserializer));
               tmp.put(SetType.class, new SetTypeDeserializer(deserializer));
      diff --git a/core/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/IntegerTypeDeserializer.java b/core/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/IntegerTypeDeserializer.java
      new file mode 100644
      index 0000000..f240cee
      --- /dev/null
      +++ b/core/src/main/java/io/debezium/connector/cassandra/transforms/type/deserializer/IntegerTypeDeserializer.java
      @@ -0,0 +1,42 @@
      +/*
      + * Copyright Debezium Authors.
      + *
      + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
      + */
      +package io.debezium.connector.cassandra.transforms.type.deserializer;
      +
      +import io.debezium.connector.cassandra.transforms.CassandraTypeKafkaSchemaBuilders;
      +
      +import java.math.BigInteger;
      +import java.nio.ByteBuffer;
      +
      +import org.apache.cassandra.db.marshal.AbstractType;
      +import org.apache.kafka.connect.data.SchemaBuilder;
      +
      +import io.debezium.connector.cassandra.transforms.DebeziumTypeDeserializer;
      +
      +public class IntegerTypeDeserializer extends LogicalTypeDeserializer {
      +
      +    private final DebeziumTypeDeserializer deserializer;
      +
      +    public IntegerTypeDeserializer(DebeziumTypeDeserializer deserializer) {
      +        this.deserializer = deserializer;
      +    }
      +
      +    @Override
      +    public Object deserialize(AbstractType<?> abstractType, ByteBuffer bb) {
      +        Object value = deserializer.deserialize(abstractType, bb);
      +        return formatDeserializedValue(abstractType, value);
      +    }
      +
      +    @Override
      +    public SchemaBuilder getSchemaBuilder(AbstractType<?> abstractType) {
      +        return CassandraTypeKafkaSchemaBuilders.STRING_TYPE;
      +    }
      +
      +    @Override
      +    public Object formatDeserializedValue(AbstractType<?> abstractType, Object value) {
      +        BigInteger bigint = (BigInteger) value;
      +        return bigint.toString();
      +    }
      +}
      

       

       

      (One question is how we want to format deserialized varint values. Options include java.math.BigInteger object as binary; deserialized varint values as formatted strings (as above); longs?).

      Attachments

        Activity

          People

            Unassigned Unassigned
            keriharris Keri Harris (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: