Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5826

fix names of range fields in schema to comply with Avro standard

XMLWordPrintable

      version v2.1.0.Alpha1

      A tester hit this problem:

      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]: 22:14:21.100 [pool-4-thread-2] ERROR io.debezium.connector.cassandra.QueueProcessor - Processing of event Record{source={cluster=russdCassandraNOSSL, keyspace=indtest, file=, connector=cassandra, pos=-1, ts_micro=1668118460063000, version=2.0.0.Final, snapshot=true, table=mytable172}, after={col1={name=col1, value=C7FK28PJNODLR86, deletionTs=null, type=PARTITION}, col2={name=col2, value=GG50TTQZEU, deletionTs=null, type=REGULAR}, col3={name=col3, value=42, deletionTs=null, type=REGULAR}, time={name=time, value=1657105070427, deletionTs=null, type=REGULAR}}, keySchema=Schema{io.debezium.connector.cassandra.IND20.indtest.mytable172.Key:STRUCT}, valueSchema=Schema{io.debezium.connector.cassandra.IND20.indtest.mytable172.Envelope:STRUCT}, op=i, ts=1668118460068} was errorneous: {}
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]: io.debezium.DebeziumException: Failed to send record Record{source={cluster=russdCassandraNOSSL, keyspace=indtest, file=, connector=cassandra, pos=-1, ts_micro=1668118460063000, version=2.0.0.Final, snapshot=true, table=mytable172}, after={col1={name=col1, value=C7FK28PJNODLR86, deletionTs=null, type=PARTITION}, col2={name=col2, value=GG50TTQZEU, deletionTs=null, type=REGULAR}, col3={name=col3, value=42, deletionTs=null, type=REGULAR}, time={name=time, value=1657105070427, deletionTs=null, type=REGULAR}}, keySchema=Schema{io.debezium.connector.cassandra.IND20.indtest.mytable172.Key:STRUCT}, valueSchema=Schema{io.debezium.connector.cassandra.IND20.indtest.mytable172.Envelope:STRUCT}, op=i, ts=1668118460068}
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.debezium.connector.cassandra.KafkaRecordEmitter.emit(KafkaRecordEmitter.java:72)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.debezium.connector.cassandra.QueueProcessor.processEvent(QueueProcessor.java:113)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.debezium.connector.cassandra.QueueProcessor.process(QueueProcessor.java:71)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.debezium.connector.cassandra.AbstractProcessor.start(AbstractProcessor.java:63)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.debezium.connector.cassandra.CassandraConnectorTaskTemplate$ProcessorGroup.lambda$start$0(CassandraConnectorTaskTemplate.java:238)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at java.base/java.lang.Thread.run(Thread.java:829)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]: Caused by: org.apache.avro.SchemaParseException: Illegal initial character: .range_start
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at org.apache.avro.Schema.validateName(Schema.java:1557)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at org.apache.avro.Schema.access$400(Schema.java:87)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at org.apache.avro.Schema$Field.<init>(Schema.java:541)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at org.apache.avro.Schema$Field.<init>(Schema.java:580)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1155)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:941)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:1154)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:941)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:751)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:745)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:85)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.debezium.connector.cassandra.KafkaRecordEmitter.toProducerRecord(KafkaRecordEmitter.java:82)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         at io.debezium.connector.cassandra.KafkaRecordEmitter.emit(KafkaRecordEmitter.java:63)
      Nov 10 22:14:21 ip-10-1-2-44 debezium[1868562]:         ... 9 common frames omitted
      

      Looking into Avro's validate method, I see this:

      private static String validateName(String name) {
          if (!(Boolean)validateNames.get()) {
              return name;
          } else {
              int length = name.length();
              if (length == 0) {
                  throw new SchemaParseException("Empty name");
              } else {
                  char first = name.charAt(0);
                  if (!Character.isLetter(first) && first != '_') {
                      throw new SchemaParseException("Illegal initial character: " + name);
                  } else {
                      for(int i = 1; i < length; ++i) {
                          char c = name.charAt(i);
                          if (!Character.isLetterOrDigit(c) && c != '_') {
                              throw new SchemaParseException("Illegal character in: " + name);
                          }
                      }
      
                      return name;
                  }
              }
          }
      }
      

      Based on that, I think we should rename .range_start and .range_end to _range_start and _range_end

              Unassigned Unassigned
              smiklosovic Stefan Miklosovic (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: