Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5344

Cannot expand JSON payload with nested arrays of objects

XMLWordPrintable

    • False
    • None
    • False
    • Hide
      • Config Debezium as indicate in the description
      • Create table in postgresql as indicates in the description
      • Create a pub/sub topic that matches Debezium config
      • Insert row in postgresql table with this content in payload's field:
        {
           "id":"1028cbab-c55b-4dab-a367-7d9471466a59",
           "attributes":[
             
        Unknown macro: {         "id"}

           ]
        }

      Show
      Config Debezium as indicate in the description Create table in postgresql as indicates in the description Create a pub/sub topic that matches Debezium config Insert row in postgresql table with this content in payload's field: {    "id":"1028cbab-c55b-4dab-a367-7d9471466a59",    "attributes":[       Unknown macro: {         "id"}    ] }

      I am using debezium, latest version, to send a json stored in a postgresql table to a pub/sub queue.

      My Debezium configuration is as follows:

      debezium.sink.type=pubsub
      debezium.sink.pubsub.project.id=name-project
      debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
      debezium.source.offset.storage.file.filename=data/offsets.dat
      debezium.source.offset.flush.interval.ms=0
      debezium.source.database.hostname=host_database
      debezium.source.database.port=5432
      debezium.source.database.user=user
      debezium.source.database.password=pass
      debezium.source.database.dbname=name_database
      debezium.source.database.server.name=postgresql1x
      debezium.source.table.include.list=public.outbox_event
      debezium.source.plugin.name=pgoutput
      debezium.source.database.sslmode=require
      debezium.source.plugin.name=wal2json
      debezium.source.slot.name=dsproductosoutboxevent
      debezium.transforms=outbox
      debezium.transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
      debezium.transforms.outbox.route.topic.replacement=postgresql1x.public.outbox
      debezium.source.key.converter=org.apache.kafka.connect.json.JsonConverter
      debezium.source.key.converter.schemas.enable=false
      debezium.source.value.converter=org.apache.kafka.connect.json.JsonConverter
      debezium.source.value.converter.schemas.enable=false
      debezium.transforms.outbox.table.expand.json.payload=true
      

       

      my postgresql table has the following columns:

      id uuid not null,
      aggregatetype varchar(255) not null,
      aggregateid varchar(255) not null,
      type varchar(255) not null,
      payload jsonb,
      created_at timestamp default timezone('utc'::text, now())
      

       

      The expected behavior is that when you insert a json into the payload column, debezium picks it up and sends it to pubsub. 

      If I insert a json of this type:

      {
         "id":"1028cbab-c55b-4dab-a367-7d9471466a59",
         "attributes":[
            {
               "id":"10911586",
               "adr":null,
               "eans":[
                  {
                     "code":"2007000013656",
                     "createdAt":"2022-06-08T11:33:16.465Z"
                  },
               ],
               "purchaseBusinessStatus":"BLOCOM",
               "isCanBeReturnToSupplier":false,
               "purchaseWarehouseStatus":"NONAUT"
            }
         ]
      }
      

      It works perfectly. but if I introduce two objects in some array, as in the following json:

      {
         "id":"1028cbab-c55b-4dab-a367-7d9471466a59",
         "attributes":[
            {
               "id":"10911586",
               "adr":null,
               "eans":[
                  {
                     "code":"2007000013656",
                     "createdAt":"2022-06-08T11:33:16.465Z"
                  },
                  {
                     "code":"2000630108556",
                     "createdAt":"2022-06-23T07:57:54.717Z"
                  }
               ],
               "purchaseBusinessStatus":"BLOCOM",
               "isCanBeReturnToSupplier":false,
               "purchaseWarehouseStatus":"NONAUT"
            }
         ]
      }
      

      Debezium fails with the following error: Cannot create field because of field name duplication code. 

      This is the complete error trace:

      {
         "timestamp":"2022-06-30T17:09:33.795Z",
         "sequence":175,
         "loggerClassName":"org.slf4j.impl.Slf4jLogger",
         "loggerName":"io.debezium.transforms.outbox.EventRouterDelegate",
         "level":"WARN",
         "message":"JSON expansion failed",
         "threadName":"pool-7-thread-1",
         "threadId":17,
         "mdc":{
            
         },
         "ndc":"",
         "hostName":"3566e810333b",
         "processName":"io.debezium.server.Main",
         "processId":1,
         "exception":{
            "refId":1,
            "exceptionType":"org.apache.kafka.connect.errors.SchemaBuilderException",
            "message":"Cannot create field because of field name duplication code",
            "frames":[
               {
                  "class":"org.apache.kafka.connect.data.SchemaBuilder",
                  "method":"field",
                  "line":330
               },
               {
                  "class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
                  "method":"addFieldSchema",
                  "line":53
               },
               {
                  "class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
                  "method":"buildDocumentUnionSchema",
                  "line":146
               },
               {
                  "class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
                  "method":"findArrayMemberSchema",
                  "line":117
               },
               {
                  "class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
                  "method":"jsonValueToSchema",
                  "line":78
               },
               {
                  "class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
                  "method":"addFieldSchema",
                  "line":51
               },
               {
                  "class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
                  "method":"jsonNodeToSchemaBuilder",
                  "line":42
               },
               {
                  "class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
                  "method":"buildDocumentUnionSchema",
                  "line":139
               },
               {
                  "class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
                  "method":"findArrayMemberSchema",
                  "line":117
               },
               {
                  "class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
                  "method":"jsonValueToSchema",
                  "line":78
               },
               {
                  "class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
                  "method":"addFieldSchema",
                  "line":51
               },
               {
                  "class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
                  "method":"jsonNodeToSchemaBuilder",
                  "line":42
               },
               {
                  "class":"io.debezium.transforms.outbox.SchemaBuilderUtil",
                  "method":"jsonNodeToSchema",
                  "line":33
               },
               {
                  "class":"io.debezium.transforms.outbox.EventRouterDelegate",
                  "method":"apply",
                  "line":162
               },
               {
                  "class":"io.debezium.transforms.outbox.EventRouter",
                  "method":"apply",
                  "line":25
               },
               {
                  "class":"io.debezium.embedded.Transformations",
                  "method":"transform",
                  "line":74
               },
               {
                  "class":"java.util.stream.ReferencePipeline$3$1",
                  "method":"accept",
                  "line":195
               },
               {
                  "class":"java.util.ArrayList$ArrayListSpliterator",
                  "method":"forEachRemaining",
                  "line":1655
               },
               {
                  "class":"java.util.stream.AbstractPipeline",
                  "method":"copyInto",
                  "line":484
               },
               {
                  "class":"java.util.stream.AbstractPipeline",
                  "method":"wrapAndCopyInto",
                  "line":474
               },
               {
                  "class":"java.util.stream.ReduceOps$ReduceOp",
                  "method":"evaluateSequential",
                  "line":913
               },
               {
                  "class":"java.util.stream.AbstractPipeline",
                  "method":"evaluate",
                  "line":234
               },
               {
                  "class":"java.util.stream.ReferencePipeline",
                  "method":"collect",
                  "line":578
               },
               {
                  "class":"io.debezium.embedded.EmbeddedEngine",
                  "method":"run",
                  "line":814
               },
               {
                  "class":"io.debezium.embedded.ConvertingEngineBuilder$2",
                  "method":"run",
                  "line":188
               },
               {
                  "class":"io.debezium.server.DebeziumServer",
                  "method":"lambda$start$1",
                  "line":147
               },
               {
                  "class":"java.util.concurrent.ThreadPoolExecutor",
                  "method":"runWorker",
                  "line":1128
               },
               {
                  "class":"java.util.concurrent.ThreadPoolExecutor$Worker",
                  "method":"run",
                  "line":628
               },
               {
                  "class":"java.lang.Thread",
                  "method":"run",
                  "line":829
               }
            ]
         }
      }
      

              Unassigned Unassigned
              ruben.hernandez@luceit.es Ruben Hernandez (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              5 Start watching this issue

                Created:
                Updated:
                Resolved: