-
Bug
-
Resolution: Done
-
Major
-
1.9.4.Final
-
None
-
False
-
None
-
False
-
I am using debezium, latest version, to send a json stored in a postgresql table to a pub/sub queue.
My Debezium configuration is as follows:
debezium.sink.type=pubsub debezium.sink.pubsub.project.id=name-project debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=host_database debezium.source.database.port=5432 debezium.source.database.user=user debezium.source.database.password=pass debezium.source.database.dbname=name_database debezium.source.database.server.name=postgresql1x debezium.source.table.include.list=public.outbox_event debezium.source.plugin.name=pgoutput debezium.source.database.sslmode=require debezium.source.plugin.name=wal2json debezium.source.slot.name=dsproductosoutboxevent debezium.transforms=outbox debezium.transforms.outbox.type=io.debezium.transforms.outbox.EventRouter debezium.transforms.outbox.route.topic.replacement=postgresql1x.public.outbox debezium.source.key.converter=org.apache.kafka.connect.json.JsonConverter debezium.source.key.converter.schemas.enable=false debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter debezium.source.value.converter.schemas.enable=false debezium.transforms.outbox.table.expand.json.payload=true
my postgresql table has the following columns:
id uuid not null, aggregatetype varchar(255) not null, aggregateid varchar(255) not null, type varchar(255) not null, payload jsonb, created_at timestamp default timezone('utc'::text, now())
The expected behavior is that when you insert a json into the payload column, debezium picks it up and sends it to pubsub.
If I insert a json of this type:
{ "id":"1028cbab-c55b-4dab-a367-7d9471466a59", "attributes":[ { "id":"10911586", "adr":null, "eans":[ { "code":"2007000013656", "createdAt":"2022-06-08T11:33:16.465Z" }, ], "purchaseBusinessStatus":"BLOCOM", "isCanBeReturnToSupplier":false, "purchaseWarehouseStatus":"NONAUT" } ] }
It works perfectly. but if I introduce two objects in some array, as in the following json:
{ "id":"1028cbab-c55b-4dab-a367-7d9471466a59", "attributes":[ { "id":"10911586", "adr":null, "eans":[ { "code":"2007000013656", "createdAt":"2022-06-08T11:33:16.465Z" }, { "code":"2000630108556", "createdAt":"2022-06-23T07:57:54.717Z" } ], "purchaseBusinessStatus":"BLOCOM", "isCanBeReturnToSupplier":false, "purchaseWarehouseStatus":"NONAUT" } ] }
Debezium fails with the following error: Cannot create field because of field name duplication code.
This is the complete error trace:
{ "timestamp":"2022-06-30T17:09:33.795Z", "sequence":175, "loggerClassName":"org.slf4j.impl.Slf4jLogger", "loggerName":"io.debezium.transforms.outbox.EventRouterDelegate", "level":"WARN", "message":"JSON expansion failed", "threadName":"pool-7-thread-1", "threadId":17, "mdc":{ }, "ndc":"", "hostName":"3566e810333b", "processName":"io.debezium.server.Main", "processId":1, "exception":{ "refId":1, "exceptionType":"org.apache.kafka.connect.errors.SchemaBuilderException", "message":"Cannot create field because of field name duplication code", "frames":[ { "class":"org.apache.kafka.connect.data.SchemaBuilder", "method":"field", "line":330 }, { "class":"io.debezium.transforms.outbox.SchemaBuilderUtil", "method":"addFieldSchema", "line":53 }, { "class":"io.debezium.transforms.outbox.SchemaBuilderUtil", "method":"buildDocumentUnionSchema", "line":146 }, { "class":"io.debezium.transforms.outbox.SchemaBuilderUtil", "method":"findArrayMemberSchema", "line":117 }, { "class":"io.debezium.transforms.outbox.SchemaBuilderUtil", "method":"jsonValueToSchema", "line":78 }, { "class":"io.debezium.transforms.outbox.SchemaBuilderUtil", "method":"addFieldSchema", "line":51 }, { "class":"io.debezium.transforms.outbox.SchemaBuilderUtil", "method":"jsonNodeToSchemaBuilder", "line":42 }, { "class":"io.debezium.transforms.outbox.SchemaBuilderUtil", "method":"buildDocumentUnionSchema", "line":139 }, { "class":"io.debezium.transforms.outbox.SchemaBuilderUtil", "method":"findArrayMemberSchema", "line":117 }, { "class":"io.debezium.transforms.outbox.SchemaBuilderUtil", "method":"jsonValueToSchema", "line":78 }, { "class":"io.debezium.transforms.outbox.SchemaBuilderUtil", "method":"addFieldSchema", "line":51 }, { "class":"io.debezium.transforms.outbox.SchemaBuilderUtil", "method":"jsonNodeToSchemaBuilder", "line":42 }, { "class":"io.debezium.transforms.outbox.SchemaBuilderUtil", "method":"jsonNodeToSchema", "line":33 }, { "class":"io.debezium.transforms.outbox.EventRouterDelegate", "method":"apply", "line":162 }, { "class":"io.debezium.transforms.outbox.EventRouter", "method":"apply", "line":25 }, { "class":"io.debezium.embedded.Transformations", "method":"transform", "line":74 }, { "class":"java.util.stream.ReferencePipeline$3$1", "method":"accept", "line":195 }, { "class":"java.util.ArrayList$ArrayListSpliterator", "method":"forEachRemaining", "line":1655 }, { "class":"java.util.stream.AbstractPipeline", "method":"copyInto", "line":484 }, { "class":"java.util.stream.AbstractPipeline", "method":"wrapAndCopyInto", "line":474 }, { "class":"java.util.stream.ReduceOps$ReduceOp", "method":"evaluateSequential", "line":913 }, { "class":"java.util.stream.AbstractPipeline", "method":"evaluate", "line":234 }, { "class":"java.util.stream.ReferencePipeline", "method":"collect", "line":578 }, { "class":"io.debezium.embedded.EmbeddedEngine", "method":"run", "line":814 }, { "class":"io.debezium.embedded.ConvertingEngineBuilder$2", "method":"run", "line":188 }, { "class":"io.debezium.server.DebeziumServer", "method":"lambda$start$1", "line":147 }, { "class":"java.util.concurrent.ThreadPoolExecutor", "method":"runWorker", "line":1128 }, { "class":"java.util.concurrent.ThreadPoolExecutor$Worker", "method":"run", "line":628 }, { "class":"java.lang.Thread", "method":"run", "line":829 } ] } }
- impacts account
-
DBZ-5654 Outbox pattern nested payload leads to connector crash
- Closed