Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8854

Support for collection.fields.additional.error.on.missing in MongoEventRouter

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Unresolved
    • Icon: Blocker Blocker
    • 3.4.0.CR1
    • 2.5.4.Final
    • mongodb-connector
    • None
    • False
    • Hide

      None

      Show
      None
    • False

      Bug report

      Reporting a bug for MongoDB connector:

      transforms.outbox.collection.fields.error.on.missing

      Seems to be ignored.

      I ran some tests on the Debezium source and implemented a potential PR for submission.

      I just need to confirm whether the property I'm using in the configuration is intended for the purpose I addressed.

      What Debezium connector do you use and what version?

      MongoDB Source

      What is the connector configuration?

      "transforms.outbox.collection.fields.additional.placement": "aggregateType:header:aggregateType,eventDate:header:eventTime,eventType:header:type,eventId:header:id,customer:envelope:customerId",
      "transforms.outbox.collection.fields.error.on.missing": "false",

      What is the captured database version and mode of deployment?

       
      mongo:7.0

      What behavior do you expect?

      If the field moved doesn't exists the processing should go on  

      What behavior do you see?

      An execption is thrown.

      Do you see the same behaviour using the latest released Debezium version?

      Yes, tested with debezium source unit tests (MongoEventRouterTest.java)

      Do you have the connector logs, ideally from start till finish?

      This is the exception if the field doesn't exist

      java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Field.schema()" because the return value of "org.apache.kafka.connect.data.Schema.field(String)" is null
          at io.debezium.transforms.outbox.EventRouterDelegate.lambda$getSchemaBuilder$2(EventRouterDelegate.java:418)
          at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
          at io.debezium.transforms.outbox.EventRouterDelegate.getSchemaBuilder(EventRouterDelegate.java:414)
          at io.debezium.transforms.outbox.EventRouterDelegate.getValueSchema(EventRouterDelegate.java:389)
          at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:183)
          at io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter.apply(MongoEventRouter.java:62)
          at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)

      How to reproduce the issue using our tutorial deployment?

      Try to add this test to MongoEventRouterTest.java

      @Test
      public void canSetFieldIntoTheEnvelopeWithAliasWithMissingField() {
        final Map<String, String> config = new HashMap<>();
        config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(),   "nonExistingField:envelope:aggregateType");
        config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_ERROR_ON_MISSING.name(), "false");
        router.configure(config);

        final SourceRecord eventRecord = createEventRecord();
        final SourceRecord eventRouted = router.apply(eventRecord);

        try{
          ((Struct) eventRouted.value()).get("aggregateType");
          fail("A DataException should be thrown");
        }catch (Exception e){

        };
      }

      Implementation ideas (optional)

      it seems that this two lines of code needs to be added to MongoEventRotuer.java (otherwise will be ignored)

      fieldNameConverter.put(
      MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_ERROR_ON_MISSING.name(),
      EventRouterConfigDefinition.FIELDS_ADDITIONAL_ERROR_ON_MISSING.name());

       
      And Add
       

      public static final Field FIELDS_ADDITIONAL_ERROR_ON_MISSING = Field.create("table.fields.additional.error.on.missing")
      .withDisplayName("Should the transform error if an additional field is missing in the change data")}}
      .withType(ConfigDef.Type.BOOLEAN)
      .withDefault(true)}}
      .withWidth(ConfigDef.Width.MEDIUM)
      .withImportance(ConfigDef.Importance.MEDIUM)
      .withDescription("When transforming the 'table.fields.additional.placement' fields, should the transform throw" +
      " an exception if one of the fields is missing in the change data");

       
      To MongoEventRouterConfigDefinition.java
       
      Use additionalFieldsErrorOnMissing variable in the EventRouterDelegate.java and update the method getSchemaBuilder to avoid exceptions on non existing fields:
       

      private SchemaBuilder getSchemaBuilder(Schema payloadSchema, Schema debeziumEventSchema, String routedTopic) {
      SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(getSchemaName(debeziumEventSchema, routedTopic));

      // Add payload field
      schemaBuilder.field(ENVELOPE_PAYLOAD, payloadSchema);

      // Add additional fields while keeping the schema inherited from Debezium based on the table column type
      additionalFields.forEach((additionalField -> {
      if (additionalField.getPlacement() == AdditionalFieldPlacement.ENVELOPE) {
      if (!additionalFieldsErrorOnMissing && debeziumEventSchema.field(additionalField.getField()) == null) {
      return;
      }
      schemaBuilder.field(
      additionalField.getAlias(),
      debeziumEventSchema.field(additionalField.getField()).schema());
      }
      }));

      return schemaBuilder;
      }

              Unassigned Unassigned
              eljeko Stefano Linguerri (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated: