Details
-
Bug
-
Resolution: Not a Bug
-
Major
-
None
-
1.2.0.Final
-
None
Description
While trying to "unwrap" records generated by the Debezium MongoDB connector using the New Record State Extraction SMT, we are receiving the following exception on a subset of the records:
Executing stage 'TRANSFORMATION' with class 'io.debezium.connector.mongodb.transforms.ExtractNewDocumentState'. (org.apache.kafka.connect.runtime.errors.LogReporter:62) org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{STRING} at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53) at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:210) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1654) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:163) at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:212) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1654) at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:163) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:51) at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.newRecord(ExtractNewDocumentState.java:320) at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.apply(ExtractNewDocumentState.java:260) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)
I've inspected a number of "good" and "bad" records and the only difference that I can find is that all of the "good" records have the field `extraDetails` (which is an array of nested documents) populated and all of the "bad" records have an empty array instead.