-
Enhancement
-
Resolution: Unresolved
-
Blocker
-
2.5.4.Final
-
None
-
False
-
-
False
Bug report
Reporting a bug for MongoDB connector:
transforms.outbox.collection.fields.error.on.missing
Seems to be ignored.
I ran some tests on the Debezium source and implemented a potential PR for submission.
I just need to confirm whether the property I'm using in the configuration is intended for the purpose I addressed.
What Debezium connector do you use and what version?
MongoDB Source
What is the connector configuration?
"transforms.outbox.collection.fields.additional.placement": "aggregateType:header:aggregateType,eventDate:header:eventTime,eventType:header:type,eventId:header:id,customer:envelope:customerId",
"transforms.outbox.collection.fields.error.on.missing": "false",
What is the captured database version and mode of deployment?
mongo:7.0
What behavior do you expect?
If the field moved doesn't exists the processing should go on
What behavior do you see?
An execption is thrown.
Do you see the same behaviour using the latest released Debezium version?
Yes, tested with debezium source unit tests (MongoEventRouterTest.java)
Do you have the connector logs, ideally from start till finish?
This is the exception if the field doesn't exist
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Field.schema()" because the return value of "org.apache.kafka.connect.data.Schema.field(String)" is null
at io.debezium.transforms.outbox.EventRouterDelegate.lambda$getSchemaBuilder$2(EventRouterDelegate.java:418)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at io.debezium.transforms.outbox.EventRouterDelegate.getSchemaBuilder(EventRouterDelegate.java:414)
at io.debezium.transforms.outbox.EventRouterDelegate.getValueSchema(EventRouterDelegate.java:389)
at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:183)
at io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter.apply(MongoEventRouter.java:62)
at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)
How to reproduce the issue using our tutorial deployment?
Try to add this test to MongoEventRouterTest.java
@Test
public void canSetFieldIntoTheEnvelopeWithAliasWithMissingField() {
final Map<String, String> config = new HashMap<>();
config.put(MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_PLACEMENT.name(), "nonExistingField:envelope:aggregateType");
config.put(EventRouterConfigDefinition.FIELDS_ADDITIONAL_ERROR_ON_MISSING.name(), "false");
router.configure(config);
final SourceRecord eventRecord = createEventRecord();
final SourceRecord eventRouted = router.apply(eventRecord);
try{
((Struct) eventRouted.value()).get("aggregateType");
fail("A DataException should be thrown");
}catch (Exception e){
};
}
Implementation ideas (optional)
it seems that this two lines of code needs to be added to MongoEventRotuer.java (otherwise will be ignored)
fieldNameConverter.put(
MongoEventRouterConfigDefinition.FIELDS_ADDITIONAL_ERROR_ON_MISSING.name(),
EventRouterConfigDefinition.FIELDS_ADDITIONAL_ERROR_ON_MISSING.name());
And Add
public static final Field FIELDS_ADDITIONAL_ERROR_ON_MISSING = Field.create("table.fields.additional.error.on.missing")
.withDisplayName("Should the transform error if an additional field is missing in the change data")}}
.withType(ConfigDef.Type.BOOLEAN)
.withDefault(true)}}
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("When transforming the 'table.fields.additional.placement' fields, should the transform throw" +
" an exception if one of the fields is missing in the change data");
To MongoEventRouterConfigDefinition.java
Use additionalFieldsErrorOnMissing variable in the EventRouterDelegate.java and update the method getSchemaBuilder to avoid exceptions on non existing fields:
private SchemaBuilder getSchemaBuilder(Schema payloadSchema, Schema debeziumEventSchema, String routedTopic) {
SchemaBuilder schemaBuilder = SchemaBuilder.struct().name(getSchemaName(debeziumEventSchema, routedTopic));// Add payload field
schemaBuilder.field(ENVELOPE_PAYLOAD, payloadSchema);// Add additional fields while keeping the schema inherited from Debezium based on the table column type
additionalFields.forEach((additionalField -> {
if (additionalField.getPlacement() == AdditionalFieldPlacement.ENVELOPE) {
if (!additionalFieldsErrorOnMissing && debeziumEventSchema.field(additionalField.getField()) == null) {
return;
}
schemaBuilder.field(
additionalField.getAlias(),
debeziumEventSchema.field(additionalField.getField()).schema());
}
}));return schemaBuilder;
}