-
Bug
-
Resolution: Unresolved
-
Major
-
None
-
None
-
False
-
-
False
Bug report
Using mongodb source connector and NATS Jetstream sink connector, trying to deploy debezium server on our k8s cluster, for 6 replica sets. They worked fine initially when we didn't have any transformers defined. But we came across MongoDB New Document State Extraction and wanted to unwrap the document instead of having before and after fields as json strings and having to extract values downstream.
We expect io.debezium.connector.mongodb.transforms.ExtractNewDocumentState to handle schema changes as well, instead of throwing exceptions whenever there's a new field that it didn't see before. This configuration works fine (for now) for some of the replica sets and collections but fails with the following exception for others.
org.apache.kafka.connect.errors.DataException:<field_name> is not a valid field name.
We would like to retain all the info we can get from the change stream of mongo and write it into our data lake. So ignoring such fields also not an option.
We might be missing something or have wrong assumptions, please point me to the right configs, if something's missing in our configuarion.
What Debezium connector do you use and what version?
debezium/server:3.0
version: 3.0.8.Final
What is the connector configuration?
application.properties
# Sink configuration debezium.sink.type=nats-jetstream debezium.sink.nats-jetstream.url=nats://<our_nats_url>:4222 debezium.sink.nats-jetstream.subjects=DebeziumStream.> debezium.sink.nats-jetstream.create-stream=false # Source configuration debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector debezium.source.tasks.max=1 debezium.source.topic.prefix=DebeziumStream debezium.source.mongodb.connection.string=<MONGO_RS_URI> debezium.source.filters.match.mode=literal debezium.source.database.include.list=backend debezium.source.collection.include.list=backend.table debezium.source.snapshot.mode=no_data debezium.source.capture.mode=change_streams_update_full # Format configuration debezium.format.value=json debezium.format.value.schemas.enable=false # Storage configuration debezium.source.offset.storage.file.filename=/offset/offsets.dat debezium.source.offset.flush.interval.ms=0 # Transform configuration debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState debezium.transforms.unwrap.delete.tombstone.handling.mode=rewrite debezium.transforms.unwrap.delete.tombstone.handling.mode.rewrite-with-id=true debezium.transforms.unwrap.add.fields=op,source.ts_ms:ts_ms,collection,name,version
What is the captured database version and mode of deployment?
mongodb 6.0.7 - self hosted on our k8s cluster
What behavior do you expect?
debezium server understands documents schema dynamically and unwraps the message instead of raising an exception.
What behavior do you see?
exception raised
org.apache.kafka.connect.errors.DataException:<column_name> is not a valid field name
Do you see the same behaviour using the latest released Debezium version?
(Ideally, also verify with latest Alpha/Beta/CR version)
Also tried with version 3.1
Do you have the connector logs, ideally from start till finish?
2025-04-25 14:17:36,645 ERROR [io.deb.emb.asy.AsyncEmbeddedEngine] (pool-7-thread-1) Engine has failed with : java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.DataException: <field> is not a valid field name at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) at io.debezium.embedded.async.AsyncEmbeddedEngine.runTasksPolling(AsyncEmbeddedEngine.java:496) at io.debezium.embedded.async.AsyncEmbeddedEngine.run(AsyncEmbeddedEngine.java:220) at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:182) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.DataException: <field> is not a valid field name at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) at io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor.processRecords(ParallelSmtAndConvertBatchProcessor.java:49) at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1203) at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1183) at io.debezium.embedded.async.RetryingCallable.call(RetryingCallable.java:47) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ... 3 more Caused by: org.apache.kafka.connect.errors.DataException: <field> is not a valid field name at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) at org.apache.kafka.connect.data.Struct.put(Struct.java:202) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:217) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:154) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:268) at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:192) at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1116) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:184) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:268) at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:192) at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1116) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:184) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:154) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:268) at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:192) at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1116) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:184) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:154) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:154) at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:63) at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.newRecord(ExtractNewDocumentState.java:292) at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.doApply(ExtractNewDocumentState.java:234) at io.debezium.transforms.AbstractExtractNewRecordState.apply(AbstractExtractNewRecordState.java:119) at io.debezium.embedded.Transformations.transform(Transformations.java:92) at io.debezium.embedded.async.ProcessingCallables$TransformAndConvertRecord.call(ProcessingCallables.java:80) ... 4 more
Also attached the full logs
How to reproduce the issue using our tutorial deployment?
this setup doesn't exist in the tutorial section but I'll see what I can do
Which use case/requirement will be addressed by the proposed feature?
debeizum server handles the dynamic nature of the mongodb documents
- is incorporated by
-
DBZ-5920 Ingestion issues with Mongodb when empty [] or empty {} appear in the Json feed
-
- Closed
-