-
Bug
-
Resolution: Won't Do
-
Critical
-
None
-
None
-
None
-
None
In various spots of the code, when Debezium emits tombstones for keys, it does so as a Kafka SourceRecord with a null value and null schema. See for example here:
This turns out to be problematic, and particularly when used with this combination:
- Confluent's S3 Sink Connector (https://github.com/confluentinc/kafka-connect-storage-cloud)
- Confluent's AvroConverter (on both the Debezium MySQL Source and the Confluent S3 sink);
The problem is that the Confluent components are not able to cope with a topic that contains a mix of schema-based and schema-less values, as seen in the following Confluent code:
Because of this, when I attempt to sink Debezium topics into S3 using Confluent's connector, I get exceptions like the following:
[2017-05-03 14:31:56,348] INFO Opening record writer for: topics/my_instance.my_database.my_table/partition=1/my_instance.my_database.my_table+1+0001400000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider:66) [2017-05-03 14:31:56,422] ERROR Task s3-my-instance-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:449) org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.connect.errors.SchemaProjectorException: Switch between schema-based and schema-less data is not supported at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:213) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:163) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.SchemaProjectorException: Switch between schema-based and schema-less data is not supported at io.confluent.connect.storage.schema.StorageSchemaCompatibility.validateAndCheck(StorageSchemaCompatibility.java:75) at io.confluent.connect.storage.schema.StorageSchemaCompatibility.shouldChangeSchema(StorageSchemaCompatibility.java:91) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:184) ... 12 more [2017-05-03 14:31:56,422] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:450) [2017-05-03 14:31:56,422] INFO WorkerSinkTask{id=s3-data-dev-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:272) [2017-05-03 14:31:56,429] ERROR Task s3-data-dev-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:451) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) [2017-05-03 14:31:56,429] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)