-
Bug
-
Resolution: Done
-
Major
-
None
-
2.5.1.Final
-
None
-
False
-
None
-
False
I've got a PostgreSQL 2.5.1.Final Connector (I assume this can happen across all Connectors though) throwing an NPE when:
- The ExtractNewRecordState SMT is configured, and db or source.db are added as headers (e.g. transforms.*.add.headers)
- Heartbeats are enabled
{{2024-02-21 15:28:09 [2024-02-21 15:28:09,969] ERROR Error encountered in task STR2606_Sr_1-0. Executing stage 'TRANSFORMATION' with class 'io.debezium.transforms.ExtractNewRecordState', where source record is = SourceRecord{sourcePartition= {server=STR2606_SR_1} , sourceOffset={transaction_id=null, lsn=26969176, txId=751, ts_usec=1708527334156439}} ConnectRecord{topic='__debezium-heartbeat.STR2606_SR_1', kafkaPartition=0, key=Struct {serverName=STR2606_SR_1} , keySchema=Schema {io.debezium.connector.common.ServerNameKey:STRUCT} , value=Struct {ts_ms=1708529289699} , valueSchema=Schema {io.debezium.connector.common.Heartbeat:STRUCT} , timestamp=null, headers=ConnectHeaders(headers=)}. (org.apache.kafka.connect.runtime.errors.LogReporter)java.lang.NullPointerExceptionat io.debezium.transforms.AbstractExtractNewRecordState$FieldReference.getField(AbstractExtractNewRecordState.java:273)at io.debezium.transforms.AbstractExtractNewRecordState$FieldReference.getSchema(AbstractExtractNewRecordState.java:266)at io.debezium.transforms.AbstractExtractNewRecordState.makeHeaders(AbstractExtractNewRecordState.java:152)at io.debezium.transforms.ExtractNewRecordState.doApply(ExtractNewRecordState.java:107)at io.debezium.transforms.AbstractExtractNewRecordState.apply(AbstractExtractNewRecordState.java:109)at org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:57)}}
Docs for transforms.*.add.headers say:
If you specify a field that is not in the change event record, the SMT does not add the field to the header.
That's true for fields like op or source.ts_ms, but I can consistently reproduce the NPE by adding db or source.db to add.headers.
A workaround is to apply a Predicate to the ExtractNewRecordState SMT that ignores Heartbeat topic messages. Example:
... "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.add.headers": "op,db", ... "transforms.unwrap.predicate": "IgnoreHeartbeatStateExtraction", "transforms.unwrap.negate": "true", ... "predicates": "IgnoreHeartbeatStateExtraction", "predicates.IgnoreHeartbeatStateExtraction.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.IgnoreHeartbeatStateExtraction.pattern": ".*-heartbeat.*", ...
- is duplicated by
-
DBZ-8393 ExtractNewRecordState transform: NPE when processing non-envelope records
-
- Closed
-