-
Bug
-
Resolution: Done
-
Major
-
1.9.6.Final, 2.0.0.Beta2
-
None
-
False
-
None
-
False
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
debezium-connector-postgres/2.0.0.Beta2
debezium-connector-postgres/1.9.6.Final
What is the connector configuration?
{
"name": "postgres-outbox",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "postgres",
"database.history.kafka.topic": "schema-changes.postgres_outbox",
"database.hostname": "postgres",
"database.password": "postgres",
"database.port": "5432",
"database.server.name": "postgres_example",
"database.serverTimezone": "Europe/Paris",
"database.user": "postgres",
"decimal.handling.mode": "double",
"errors.log.enable": "true",
"errors.log.include.message": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"plugin.name": "pgoutput",
"slot.name": "postgres",
"snapshot.locking.mode": "none",
"snapshot.mode": "initial",
"snapshot.select.statement.overrides": "public.outbox",
"snapshot.select.statement.overrides.public.outbox": "select id, aggregatetype, aggregateid, payload, type, created_at from(select *, rank() over(partition by aggregatetype, aggregateid order by created_at desc) as rnk from public.outbox)a where rnk=1;",
"table.include.list": "public.outbox",
"tasks.max": "1",
"topic.creation.default.partitions": "1",
"topic.creation.default.replication.factor": "-1",
"topic.creation.enabled": "true",
"transforms": "outbox",
"transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}",
"transforms.outbox.table.expand.json.payload": "true",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.fields.additional.placement": "type:header:type,type:envelope:type",
"transforms.outbox.table.field.event.key": "aggregateid",
"value.converter.delegate.converter.type": "io.debezium.converters.ByteBufferConverter",
"value.converter.delegate.converter.type.schema.registry.url": "http://schema-registry:8081",
"value.converter.delegate.converter.type.schemas.enable": "false"
}
}
What is the captured database version and mode of deployment?
root@postgres:/# postgres -V postgres (PostgreSQL) 14.5 (Debian 14.5-1.pgdg110+1)
Used through docker-compose
Base image postgres latest : https://hub.docker.com/layers/library/postgres/latest/images/sha256-ea382336c7382562e0e2fe8d21c42ad9a2852631e95ee6da14962955ba01cfba?context=explore
What behaviour do you expect?
If I insert a nested JSONB content (the payload contains at least a value being a dict, or a list of dict) in the payload column, the connector should not fail and it should produce a parsed Avro schema.
What behaviour do you see?
If I insert a nested JSONB content (the payload contains at least a value being a dict, or a list of dict) in the payload column, the connector fails.
Do you see the same behaviour using the latest released Debezium version?
Yes, as per specified above
Do you have the connector logs, ideally from start till finish?
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:298) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic outbox.event.user : at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:298) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) ... 11 more Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message Caused by: org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault at org.apache.avro.Schema$Names.put(Schema.java:1542) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:805) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:967) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1234) at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:1129) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1234) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:995) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:979) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:1234) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:995) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:979) at org.apache.avro.Schema.toString(Schema.java:419) at org.apache.avro.Schema.toString(Schema.java:410) at io.confluent.kafka.schemaregistry.avro.AvroSchema.canonicalString(AvroSchema.java:151) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:213) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:275) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:251) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:103) at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:298) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:298) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
How to reproduce the issue?
Create the following outbox table in the public postgresql schema :
CREATE TABLE IF NOT EXISTS outbox (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
aggregatetype VARCHAR(255) NOT NULL,
aggregateid VARCHAR(255) NOT NULL,
payload JSONB,
type VARCHAR(255) NOT NULL,
created_at TIMESTAMP WITHOUT TIME ZONE DEFAULT(NOW() AT TIME ZONE('utc'))
);
Try to insert nested data in the payload field :
INSERT INTO outbox(aggregatetype, aggregateid, type, payload) SELECT 'user' AS aggregatetype , a AS aggregateid , 'pending' AS type , JSONB_BUILD_OBJECT('item', JSONB_BUILD_OBJECT('price', b)) AS payload FROM( SELECT( SELECT (random()+f/1e39)::int) AS a, (SELECT random()+f/1e39) AS b FROM generate_series(1,3) f(f) ) foo;
Deploy the connector with the config above
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
Leverage outbox pattern with nested payload with Avro serialization and delegation of converter to have a detailed Avro schema registered (ie : not a global io.debezium.data.Json field)