-
Bug
-
Resolution: Done
-
Major
-
1.9.4.Final
-
None
-
False
-
-
False
-
I am using debezium, latest version, to send a json stored in a postgresql table to a pub/sub queue.
My Debezium configuration is as follows:
debezium.sink.type=pubsub debezium.sink.pubsub.project.id=name-project debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=host_database debezium.source.database.port=5432 debezium.source.database.user=user debezium.source.database.password=pass debezium.source.database.dbname=name_database debezium.source.database.server.name=postgresql1x debezium.source.table.include.list=public.outbox_event debezium.source.plugin.name=pgoutput debezium.source.database.sslmode=require debezium.source.plugin.name=wal2json debezium.source.slot.name=dsproductosoutboxevent debezium.transforms=outbox debezium.transforms.outbox.type=io.debezium.transforms.outbox.EventRouter debezium.transforms.outbox.route.topic.replacement=postgresql1x.public.outbox debezium.source.key.converter=org.apache.kafka.connect.json.JsonConverter debezium.source.key.converter.schemas.enable=false debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter debezium.source.value.converter.schemas.enable=false debezium.transforms.outbox.table.expand.json.payload=true
my postgresql table has the following columns:
id uuid not null,
aggregatetype varchar(255) not null,
aggregateid varchar(255) not null,
type varchar(255) not null,
payload jsonb,
created_at timestamp default timezone('utc'::text, now())
The expected behavior is that when you insert a json into the payload column, debezium picks it up and sends it to pubsub.
If I insert a json of this type:
{
"id":"1028cbab-c55b-4dab-a367-7d9471466a59",
"attributes":[
{
"id":"10911586",
"adr":null,
"eans":[
{
"code":"2007000013656",
"createdAt":"2022-06-08T11:33:16.465Z"
},
],
"purchaseBusinessStatus":"BLOCOM",
"isCanBeReturnToSupplier":false,
"purchaseWarehouseStatus":"NONAUT"
}
]
}
It works perfectly. but if I introduce two objects in some array, as in the following json:
{
"id":"1028cbab-c55b-4dab-a367-7d9471466a59",
"attributes":[
{
"id":"10911586",
"adr":null,
"eans":[
{
"code":"2007000013656",
"createdAt":"2022-06-08T11:33:16.465Z"
},
{
"code":"2000630108556",
"createdAt":"2022-06-23T07:57:54.717Z"
}
],
"purchaseBusinessStatus":"BLOCOM",
"isCanBeReturnToSupplier":false,
"purchaseWarehouseStatus":"NONAUT"
}
]
}
Debezium fails with the following error: Cannot create field because of field name duplication code.
This is the complete error trace:
{
"timestamp":"2022-06-30T17:09:33.795Z",
"sequence":175,
"loggerClassName":"org.slf4j.impl.Slf4jLogger",
"loggerName":"io.debezium.transforms.outbox.EventRouterDelegate",
"level":"WARN",
"message":"JSON expansion failed",
"threadName":"pool-7-thread-1",
"threadId":17,
"mdc":{
},
"ndc":"",
"hostName":"3566e810333b",
"processName":"io.debezium.server.Main",
"processId":1,
"exception":{
"refId":1,
"exceptionType":"org.apache.kafka.connect.errors.SchemaBuilderException",
"message":"Cannot create field because of field name duplication code",
"frames":[
{
"class":"org.apache.kafka.connect.data.SchemaBuilder",
"method":"field",
"line":330
},
{
"class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
"method":"addFieldSchema",
"line":53
},
{
"class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
"method":"buildDocumentUnionSchema",
"line":146
},
{
"class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
"method":"findArrayMemberSchema",
"line":117
},
{
"class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
"method":"jsonValueToSchema",
"line":78
},
{
"class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
"method":"addFieldSchema",
"line":51
},
{
"class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
"method":"jsonNodeToSchemaBuilder",
"line":42
},
{
"class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
"method":"buildDocumentUnionSchema",
"line":139
},
{
"class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
"method":"findArrayMemberSchema",
"line":117
},
{
"class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
"method":"jsonValueToSchema",
"line":78
},
{
"class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
"method":"addFieldSchema",
"line":51
},
{
"class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
"method":"jsonNodeToSchemaBuilder",
"line":42
},
{
"class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
"method":"jsonNodeToSchema",
"line":33
},
{
"class":"io.debezium.transforms.outbox.EventRouterDelegate",
"method":"apply",
"line":162
},
{
"class":"io.debezium.transforms.outbox.EventRouter",
"method":"apply",
"line":25
},
{
"class":"io.debezium.embedded.Transformations",
"method":"transform",
"line":74
},
{
"class":"java.util.stream.ReferencePipeline$3$1",
"method":"accept",
"line":195
},
{
"class":"java.util.ArrayList$ArrayListSpliterator",
"method":"forEachRemaining",
"line":1655
},
{
"class":"java.util.stream.AbstractPipeline",
"method":"copyInto",
"line":484
},
{
"class":"java.util.stream.AbstractPipeline",
"method":"wrapAndCopyInto",
"line":474
},
{
"class":"java.util.stream.ReduceOps$ReduceOp",
"method":"evaluateSequential",
"line":913
},
{
"class":"java.util.stream.AbstractPipeline",
"method":"evaluate",
"line":234
},
{
"class":"java.util.stream.ReferencePipeline",
"method":"collect",
"line":578
},
{
"class":"io.debezium.embedded.EmbeddedEngine",
"method":"run",
"line":814
},
{
"class":"io.debezium.embedded.ConvertingEngineBuilder$2",
"method":"run",
"line":188
},
{
"class":"io.debezium.server.DebeziumServer",
"method":"lambda$start$1",
"line":147
},
{
"class":"java.util.concurrent.ThreadPoolExecutor",
"method":"runWorker",
"line":1128
},
{
"class":"java.util.concurrent.ThreadPoolExecutor$Worker",
"method":"run",
"line":628
},
{
"class":"java.lang.Thread",
"method":"run",
"line":829
}
]
}
}
- impacts account
-
DBZ-5654 Outbox pattern nested payload leads to connector crash
-
- Closed
-