-
Bug
-
Resolution: Done
-
Major
-
2.4.0.Final
-
None
-
False
-
None
-
False
Bug report
Debezium version: 2.4.0.Final
What is the connector configuration?
The cloud event is produced by the following source connector (works with no errors):
{ "name": "book-db-source-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "database.hostname": "book-db", "database.port": "5432", "database.user": "postgres", "database.password": "password", "database.dbname": "book", "database.server.name": "book", "topic.prefix": "book", "topic.creation.default.replication.factor": 1, "topic.creation.default.partitions": 6, "table.include.list": "public.outbox", "heartbeat.interval.ms": "5000", "slot.name": "book_debezium", "publication.name": "book_publication", "tombstones.on.delete": false, "transforms": "addMetadataHeaders,outbox", "transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value", "transforms.addMetadataHeaders.fields": "source,op,transaction", "transforms.addMetadataHeaders.headers": "source,op,transaction", "transforms.addMetadataHeaders.operation": "copy", "transforms.addMetadataHeaders.predicate": "isHeartbeat", "transforms.addMetadataHeaders.negate": true, "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.table.field.event.key": "event_key", "transforms.outbox.table.expand.json.payload": true, "transforms.outbox.table.fields.additional.placement": "event_type:header:type", "transforms.outbox.route.by.field": "aggregate_type", "transforms.outbox.route.topic.replacement": "outbox.event.library", "predicates": "isHeartbeat", "predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches", "predicates.isHeartbeat.pattern": "__debezium-heartbeat.*", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": false, "value.converter": "io.debezium.converters.CloudEventsConverter", "value.converter.serializer.type": "avro", "value.converter.data.serializer.type": "avro", "value.converter.avro.apicurio.registry.url": "http://schema-registry:8080", "value.converter.avro.apicurio.registry.auto-register": true, "value.converter.avro.apicurio.registry.find-latest": true, "value.converter.metadata.source": "header", "value.converter.extension.attributes.enable": false, "header.converter": "org.apache.kafka.connect.json.JsonConverter", "header.converter.schemas.enable": true } }
(SR is Apicurio)
and is consumed by:
{ "name": "user-db-sink-connector", "config": { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "outbox.event.library", "connection.url": "jdbc:postgresql://user-db:5432/user", "connection.username": "postgres", "connection.password": "password", "insert.mode": "upsert", "primary.key.mode": "record_value", "primary.key.fields": "id", "table.name.format": "inbox", "max.retries": 1, "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": false, "value.converter": "io.debezium.converters.CloudEventsConverter", "value.converter.serializer.type": "avro", "value.converter.data.serializer.type": "avro", "value.converter.avro.apicurio.registry.url": "http://schema-registry:8080", "header.converter": "org.apache.kafka.connect.json.JsonConverter", "header.converter.schemas.enable": true } }
What is the captured database version and mode of depoyment?
Postgres 16
Do you have the connector logs, ideally from start till finish?
kafka-connect | 2023-10-23 15:59:50,916 ERROR || Failed to process record: Failed to process a sink record [io.debezium.connector.jdbc.JdbcSinkConnectorTask] kafka-connect | org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record kafka-connect | at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:87) kafka-connect | at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:94) kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) kafka-connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) kafka-connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) kafka-connect | at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) kafka-connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) kafka-connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) kafka-connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) kafka-connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) kafka-connect | at java.base/java.lang.Thread.run(Thread.java:829) kafka-connect | Caused by: org.apache.kafka.connect.errors.DataException: after is not a valid field name kafka-connect | at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) kafka-connect | at org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) kafka-connect | at org.apache.kafka.connect.data.Struct.getStruct(Struct.java:191) kafka-connect | at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.applyRecordValueAsPrimaryKey(SinkRecordDescriptor.java:382) kafka-connect | at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.readSinkRecordKeyData(SinkRecordDescriptor.java:315) kafka-connect | at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:285) kafka-connect | at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:70) kafka-connect | ... 13 more
I think the message structure in Kafka UI won't be useful because of avro format, but the following is the structure if I change both serialization formats in the source connector to "json":
{ "id": "dbea9071-fadb-4ef3-8959-d9753eb6674d", "source": "/debezium/postgresql/book", "specversion": "1.0", "type": "BookCreated", "time": "2023-10-23T16:09:36.018Z", "datacontenttype": "application/json", "data": { "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": true, "field": "id" }, { "type": "string", "optional": true, "field": "name" }, { "type": "int32", "optional": true, "field": "publicationYear" } ], "optional": true, "name": "payload" }, "payload": { "id": 4, "name": "name", "publicationYear": 1824 } } }
JSON use case
When using json serialization, the sink connector throws NPE due to lack of the schema:
kafka-connect | 2023-10-23 16:10:36,582 ERROR || WorkerSinkTask{id=user-db-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure [org.apache.kafka.connect.runtime.WorkerSinkTask] kafka-connect | org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure kafka-connect | at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:83) kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) kafka-connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) kafka-connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) kafka-connect | at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) kafka-connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) kafka-connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) kafka-connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) kafka-connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) kafka-connect | at java.base/java.lang.Thread.run(Thread.java:829) kafka-connect | Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record kafka-connect | at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:87) kafka-connect | at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:94) kafka-connect | ... 12 more kafka-connect | Caused by: java.lang.NullPointerException kafka-connect | at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.isFlattened(SinkRecordDescriptor.java:292) kafka-connect | at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:284) kafka-connect | at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:70) kafka-connect | ... 13 more
The NPE is thrown by:
private boolean isFlattened(SinkRecord record) { return @record.valueSchema().name()@- == null || !record.valueSchema().name().contains("Envelope"); }
The absence of the schema is because of:
// JSON Cloud Events converter always disables schema. // The conversion back thus must be schemaless. // If data are in schema/payload envelope they are extracted
- links to
-
RHEA-2024:129636 Red Hat build of Debezium 2.5.4 release