Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-235

Tombstones are emitted as schema-less values

    XMLWordPrintable

Details

    • Bug
    • Resolution: Won't Do
    • Critical
    • None
    • None
    • None
    • None

    Description

      In various spots of the code, when Debezium emits tombstones for keys, it does so as a Kafka SourceRecord with a null value and null schema. See for example here:

      This turns out to be problematic, and particularly when used with this combination:

      1. Confluent's S3 Sink Connector (https://github.com/confluentinc/kafka-connect-storage-cloud)
      2. Confluent's AvroConverter (on both the Debezium MySQL Source and the Confluent S3 sink);

      The problem is that the Confluent components are not able to cope with a topic that contains a mix of schema-based and schema-less values, as seen in the following Confluent code:

      Because of this, when I attempt to sink Debezium topics into S3 using Confluent's connector, I get exceptions like the following:

      [2017-05-03 14:31:56,348] INFO Opening record writer for: topics/my_instance.my_database.my_table/partition=1/my_instance.my_database.my_table+1+0001400000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66)
      [2017-05-03 14:31:56,422] ERROR Task s3-my-instance-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:449)
      org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.SchemaProjectorException: Switch between schema-based and schema-less data is not supported
      at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:213)
      at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:163)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.kafka.connect.errors.SchemaProjectorException: Switch between schema-based and schema-less data is not supported
      at io.confluent.connect.storage.schema.StorageSchemaCompatibility.validateAndCheck(StorageSchemaCompatibility.java:75)
      at io.confluent.connect.storage.schema.StorageSchemaCompatibility.shouldChangeSchema(StorageSchemaCompatibility.java:91)
      at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:184)
      ... 12 more
      [2017-05-03 14:31:56,422] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:450)
      [2017-05-03 14:31:56,422] INFO WorkerSinkTask{id=s3-data-dev-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272)
      [2017-05-03 14:31:56,429] ERROR Task s3-data-dev-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
      org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
      at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:451)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:748)
      [2017-05-03 14:31:56,429] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            lcasillas-oportun Luis Casillas (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: