Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8972

debezium/server mongodb org.apache.kafka.connect.errors.DataException: is not a valid field name

XMLWordPrintable

    • False
    • Hide

      None

      Show
      None
    • False

      Bug report

      Using mongodb source connector and NATS Jetstream sink connector, trying to deploy debezium server on our k8s cluster, for 6 replica sets. They worked fine initially when we didn't have any transformers defined. But we came across MongoDB New Document State Extraction and wanted to unwrap the document instead of having before and after fields as json strings and having to extract values downstream. 

       

      We expect io.debezium.connector.mongodb.transforms.ExtractNewDocumentState to handle schema changes as well, instead of throwing exceptions whenever there's a new field that it didn't see before. This configuration works fine (for now) for some of the replica sets and collections but fails with the following exception for others.

      org.apache.kafka.connect.errors.DataException:<field_name> is not a valid field name.

      We would like to retain all the info we can get from the change stream of mongo and write it into our data lake. So ignoring such fields also not an option. 

       

      We might be missing something or have wrong assumptions, please point me to the right configs, if something's missing in our configuarion.

      What Debezium connector do you use and what version?

      debezium/server:3.0
      version: 3.0.8.Final

      What is the connector configuration?

      application.properties
       

       # Sink configuration 
      debezium.sink.type=nats-jetstream 
      debezium.sink.nats-jetstream.url=nats://<our_nats_url>:4222 
      debezium.sink.nats-jetstream.subjects=DebeziumStream.>
      debezium.sink.nats-jetstream.create-stream=false 
      
      # Source configuration debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector debezium.source.tasks.max=1 debezium.source.topic.prefix=DebeziumStream debezium.source.mongodb.connection.string=<MONGO_RS_URI> debezium.source.filters.match.mode=literal debezium.source.database.include.list=backend debezium.source.collection.include.list=backend.table debezium.source.snapshot.mode=no_data debezium.source.capture.mode=change_streams_update_full 
      
      # Format configuration 
      debezium.format.value=json 
      debezium.format.value.schemas.enable=false 
      
      # Storage configuration 
      debezium.source.offset.storage.file.filename=/offset/offsets.dat 
      debezium.source.offset.flush.interval.ms=0 
      
      # Transform configuration debezium.transforms=unwrap debezium.transforms.unwrap.type=io.debezium.connector.mongodb.transforms.ExtractNewDocumentState debezium.transforms.unwrap.delete.tombstone.handling.mode=rewrite debezium.transforms.unwrap.delete.tombstone.handling.mode.rewrite-with-id=true debezium.transforms.unwrap.add.fields=op,source.ts_ms:ts_ms,collection,name,version

      What is the captured database version and mode of deployment?

      mongodb 6.0.7 - self hosted on our k8s cluster

      What behavior do you expect?

      debezium server understands documents schema dynamically and unwraps the message instead of raising an exception.

      What behavior do you see?

      exception raised

      org.apache.kafka.connect.errors.DataException:<column_name> is not a valid field name

      Do you see the same behaviour using the latest released Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      Also tried with version 3.1

      Do you have the connector logs, ideally from start till finish?

       

      2025-04-25 14:17:36,645 ERROR [io.deb.emb.asy.AsyncEmbeddedEngine] (pool-7-thread-1) Engine has failed with : java.util.concurrent.ExecutionException: java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.DataException: <field> is not a valid field name
      at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
      at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
      at io.debezium.embedded.async.AsyncEmbeddedEngine.runTasksPolling(AsyncEmbeddedEngine.java:496)
      at io.debezium.embedded.async.AsyncEmbeddedEngine.run(AsyncEmbeddedEngine.java:220)
      at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:182)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
      at java.base/java.lang.Thread.run(Thread.java:1583)
      Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.DataException: <field> is not a valid field name
      at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
      at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
      at io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor.processRecords(ParallelSmtAndConvertBatchProcessor.java:49)
      at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1203)
      at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1183)
      at io.debezium.embedded.async.RetryingCallable.call(RetryingCallable.java:47)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
      ... 3 more
      Caused by: org.apache.kafka.connect.errors.DataException: <field> is not a valid field name
      at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
      at org.apache.kafka.connect.data.Struct.put(Struct.java:202)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:217)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:154)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:268)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:192)
      at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
      at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1116)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:184)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:268)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:192)
      at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
      at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1116)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:184)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:154)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:268)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.lambda$convertFieldValue$0(MongoDataConverter.java:192)
      at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
      at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1116)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:184)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:154)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertFieldValue(MongoDataConverter.java:154)
      at io.debezium.connector.mongodb.transforms.MongoDataConverter.convertRecord(MongoDataConverter.java:63)
      at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.newRecord(ExtractNewDocumentState.java:292)
      at io.debezium.connector.mongodb.transforms.ExtractNewDocumentState.doApply(ExtractNewDocumentState.java:234)
      at io.debezium.transforms.AbstractExtractNewRecordState.apply(AbstractExtractNewRecordState.java:119)
      at io.debezium.embedded.Transformations.transform(Transformations.java:92)
      at io.debezium.embedded.async.ProcessingCallables$TransformAndConvertRecord.call(ProcessingCallables.java:80)
      ... 4 more

      Also attached the full logs

      How to reproduce the issue using our tutorial deployment?

      this setup doesn't exist in the tutorial section but I'll see what I can do

      Which use case/requirement will be addressed by the proposed feature?

      debeizum server handles the dynamic nature of the mongodb documents

       

        1. logs.jsonl
          102 kB
          Harika Duyu
        2. logs.txt
          45 kB
          Harika Duyu

              anmohant Anisha Mohanty (Inactive)
              harikaduyu Harika Duyu (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated: