-
Bug
-
Resolution: Done
-
Major
-
2.4.0.Final
-
None
-
False
-
-
False
Bug report
Debezium version: 2.4.0.Final
What is the connector configuration?
The cloud event is produced by the following source connector (works with no errors):
{
"name": "book-db-source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "book-db",
"database.port": "5432",
"database.user": "postgres",
"database.password": "password",
"database.dbname": "book",
"database.server.name": "book",
"topic.prefix": "book",
"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 6,
"table.include.list": "public.outbox",
"heartbeat.interval.ms": "5000",
"slot.name": "book_debezium",
"publication.name": "book_publication",
"tombstones.on.delete": false,
"transforms": "addMetadataHeaders,outbox",
"transforms.addMetadataHeaders.type": "org.apache.kafka.connect.transforms.HeaderFrom$Value",
"transforms.addMetadataHeaders.fields": "source,op,transaction",
"transforms.addMetadataHeaders.headers": "source,op,transaction",
"transforms.addMetadataHeaders.operation": "copy",
"transforms.addMetadataHeaders.predicate": "isHeartbeat",
"transforms.addMetadataHeaders.negate": true,
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.key": "event_key",
"transforms.outbox.table.expand.json.payload": true,
"transforms.outbox.table.fields.additional.placement": "event_type:header:type",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "outbox.event.library",
"predicates": "isHeartbeat",
"predicates.isHeartbeat.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isHeartbeat.pattern": "__debezium-heartbeat.*",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type": "avro",
"value.converter.data.serializer.type": "avro",
"value.converter.avro.apicurio.registry.url": "http://schema-registry:8080",
"value.converter.avro.apicurio.registry.auto-register": true,
"value.converter.avro.apicurio.registry.find-latest": true,
"value.converter.metadata.source": "header",
"value.converter.extension.attributes.enable": false,
"header.converter": "org.apache.kafka.connect.json.JsonConverter",
"header.converter.schemas.enable": true
}
}
(SR is Apicurio)
and is consumed by:
{
"name": "user-db-sink-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "outbox.event.library",
"connection.url": "jdbc:postgresql://user-db:5432/user",
"connection.username": "postgres",
"connection.password": "password",
"insert.mode": "upsert",
"primary.key.mode": "record_value",
"primary.key.fields": "id",
"table.name.format": "inbox",
"max.retries": 1,
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "io.debezium.converters.CloudEventsConverter",
"value.converter.serializer.type": "avro",
"value.converter.data.serializer.type": "avro",
"value.converter.avro.apicurio.registry.url": "http://schema-registry:8080",
"header.converter": "org.apache.kafka.connect.json.JsonConverter",
"header.converter.schemas.enable": true
}
}
What is the captured database version and mode of depoyment?
Postgres 16
Do you have the connector logs, ideally from start till finish?
kafka-connect | 2023-10-23 15:59:50,916 ERROR || Failed to process record: Failed to process a sink record [io.debezium.connector.jdbc.JdbcSinkConnectorTask] kafka-connect | org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record kafka-connect | at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:87) kafka-connect | at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:94) kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) kafka-connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) kafka-connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) kafka-connect | at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) kafka-connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) kafka-connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) kafka-connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) kafka-connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) kafka-connect | at java.base/java.lang.Thread.run(Thread.java:829) kafka-connect | Caused by: org.apache.kafka.connect.errors.DataException: after is not a valid field name kafka-connect | at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254) kafka-connect | at org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261) kafka-connect | at org.apache.kafka.connect.data.Struct.getStruct(Struct.java:191) kafka-connect | at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.applyRecordValueAsPrimaryKey(SinkRecordDescriptor.java:382) kafka-connect | at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.readSinkRecordKeyData(SinkRecordDescriptor.java:315) kafka-connect | at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:285) kafka-connect | at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:70) kafka-connect | ... 13 more
I think the message structure in Kafka UI won't be useful because of avro format, but the following is the structure if I change both serialization formats in the source connector to "json":
{
"id": "dbea9071-fadb-4ef3-8959-d9753eb6674d",
"source": "/debezium/postgresql/book",
"specversion": "1.0",
"type": "BookCreated",
"time": "2023-10-23T16:09:36.018Z",
"datacontenttype": "application/json",
"data": {
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": true,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "int32",
"optional": true,
"field": "publicationYear"
}
],
"optional": true,
"name": "payload"
},
"payload": {
"id": 4,
"name": "name",
"publicationYear": 1824
}
}
}
JSON use case
When using json serialization, the sink connector throws NPE due to lack of the schema:
kafka-connect | 2023-10-23 16:10:36,582 ERROR || WorkerSinkTask{id=user-db-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure [org.apache.kafka.connect.runtime.WorkerSinkTask]
kafka-connect | org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
kafka-connect | at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:83)
kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
kafka-connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
kafka-connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
kafka-connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
kafka-connect | at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
kafka-connect | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
kafka-connect | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
kafka-connect | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
kafka-connect | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
kafka-connect | at java.base/java.lang.Thread.run(Thread.java:829)
kafka-connect | Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
kafka-connect | at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:87)
kafka-connect | at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:94)
kafka-connect | ... 12 more
kafka-connect | Caused by: java.lang.NullPointerException
kafka-connect | at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.isFlattened(SinkRecordDescriptor.java:292)
kafka-connect | at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:284)
kafka-connect | at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:70)
kafka-connect | ... 13 more
The NPE is thrown by:
private boolean isFlattened(SinkRecord record) {
return @record.valueSchema().name()@- == null || !record.valueSchema().name().contains("Envelope");
}
The absence of the schema is because of:
// JSON Cloud Events converter always disables schema. // The conversion back thus must be schemaless. // If data are in schema/payload envelope they are extracted
- links to
-
RHEA-2024:129636
Red Hat build of Debezium 2.5.4 release