Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5367

Outbox JSON expansion fails when nested arrays contain no elements

    XMLWordPrintable

Details

    • Bug
    • Resolution: Done
    • Major
    • 1.9.5.Final, 2.0.0.Beta1
    • 1.9.0.Final
    • outbox
    • None
    • False
    • None
    • False
    • Hide

      It seems that the problem happens when Outbox Payload contains nested arrays where the first array is empty and the second one has elements, as in the following JSON:

      {"fullName": "John Doe", "petObjects": [{"type": "dog", "colours": []}, {"type": "cat", "colours": [{"name": "white"}]}]} 

      Here's the test that I wrote to replicate the problem locally:

      @Test
      public void canExpandJsonWithNestedArraysWhereFirstArrayIsEmpty() {
          final EventRouter<SourceRecord> router = new EventRouter<>();
          final Map<String, String> config = new HashMap<>();
          config.put(
                  EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD.name(),
                  "true");
          router.configure(config);
      
          final SourceRecord eventRecord = createEventRecord(
                  "da8d6de6-3b77-45ff-8f44-57db55a7a06c",
                  "UserCreated",
                  "10711fa5",
                  "User",
                  "{\"fullName\": \"John Doe\", \"petObjects\": [{\"type\": \"dog\", \"colors\": []}, {\"type\": \"cat\", \"colors\": [{\"name\": \"white\"}]}]}",
                  new HashMap<>(),
                  new HashMap<>());
          final SourceRecord eventRouted = router.apply(eventRecord);
      
          assertThat(eventRouted).isNotNull();
      
          Schema valueSchema = eventRouted.valueSchema();
          assertThat(valueSchema.type()).isEqualTo(SchemaBuilder.struct().type());
      
          assertThat(valueSchema.fields().size()).isEqualTo(2);
          assertThat(valueSchema.field("fullName").schema().type().getName()).isEqualTo("string");
          assertThat(valueSchema.field("petObjects").schema().type().getName()).isEqualTo("array");
          assertThat(valueSchema.field("petObjects").schema().valueSchema().fields().size()).isEqualTo(2);
          assertThat(valueSchema.field("petObjects").schema().valueSchema().type().getName()).isEqualTo("struct");
          assertThat(valueSchema.field("petObjects").schema().valueSchema().field("type").schema().type().getName()).isEqualTo("string");
          assertThat(valueSchema.field("petObjects").schema().valueSchema().field("colors").schema().type().getName()).isEqualTo("array");
          assertThat(valueSchema.field("petObjects").schema().valueSchema().field("colors").schema().valueSchema().type().getName()).isEqualTo("struct");
          assertThat(valueSchema.field("petObjects").schema().valueSchema().field("colors").schema().valueSchema().fields().size()).isEqualTo(1);
          assertThat(valueSchema.field("petObjects").schema().valueSchema().field("colors").schema().valueSchema().field("name").schema().type().getName()).isEqualTo("string");
      
          Struct valueStruct = (Struct) eventRouted.value();
          assertThat(valueStruct.get("fullName")).isEqualTo("John Doe");
          List<Object> petObjects = valueStruct.getArray("petObjects");
          assertThat(petObjects.size()).isEqualTo(2);
          assertThat(asStruct(petObjects.get(0)).get("type")).isEqualTo("dog");
          assertThat(asStruct(petObjects.get(0)).getArray("colors")).hasSize(0);
          assertThat(asStruct(petObjects.get(1)).get("type")).isEqualTo("cat");
          assertThat(asStruct(petObjects.get(1)).getArray("colors")).hasSize(1);
          List<Object> colors = asStruct(petObjects.get(1)).getArray("colors");
          assertThat(asStruct(colors.get(0)).get("name")).isEqualTo("white");
      } 

      and the test execution result:

      2022-07-05 18:24:53,805 WARN   ||  JSON expansion failed   [io.debezium.transforms.outbox.EventRouterDelegate]
      org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{STRING}
          at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53)
          at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:40)
          at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:73)
          at io.debezium.transforms.outbox.StructBuilderUtil.getArrayAsList(StructBuilderUtil.java:83)
          at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:71)
          at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:44)
          at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:73)
          at io.debezium.transforms.outbox.StructBuilderUtil.getArrayAsList(StructBuilderUtil.java:83)
          at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:71)
          at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:44)
          at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStruct(StructBuilderUtil.java:36)
          at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:163)
          at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:25)
          at io.debezium.transforms.outbox.EventRouterTest.canExpandJsonWithNestedArraysWhereFirstArrayIsEmpty(EventRouterTest.java:963)
          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
          at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)
          at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
          at java.base/java.lang.reflect.Method.invoke(Method.java:567)
          at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
          at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
          at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
          at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
          at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)
          at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
          at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
          at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
          at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
          at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
          at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
          at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
          at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
          at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
          at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
          at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
          at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
          at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
          at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
          at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
          at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
          at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
          at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)org.junit.ComparisonFailure: 
      Expected :'struct'
      Actual   :'string' 

       

      Show
      It seems that the problem happens when Outbox Payload contains nested arrays where the first array is empty and the second one has elements, as in the following JSON: { "fullName" : "John Doe" , "petObjects" : [{ "type" : "dog" , "colours" : []}, { "type" : "cat" , "colours" : [{ "name" : "white" }]}]} Here's the test that I wrote to replicate the problem locally: @Test public void canExpandJsonWithNestedArraysWhereFirstArrayIsEmpty() { final EventRouter<SourceRecord> router = new EventRouter<>(); final Map< String , String > config = new HashMap<>(); config.put( EventRouterConfigDefinition.EXPAND_JSON_PAYLOAD.name(), " true " ); router.configure(config); final SourceRecord eventRecord = createEventRecord( "da8d6de6-3b77-45ff-8f44-57db55a7a06c" , "UserCreated" , "10711fa5" , "User" , "{\" fullName\ ": \" John Doe\ ", \" petObjects\ ": [{\" type\ ": \" dog\ ", \" colors\ ": []}, {\" type\ ": \" cat\ ", \" colors\ ": [{\" name\ ": \" white\ "}]}]}" , new HashMap<>(), new HashMap<>()); final SourceRecord eventRouted = router.apply(eventRecord); assertThat(eventRouted).isNotNull(); Schema valueSchema = eventRouted.valueSchema(); assertThat(valueSchema.type()).isEqualTo(SchemaBuilder.struct().type()); assertThat(valueSchema.fields().size()).isEqualTo(2); assertThat(valueSchema.field( "fullName" ).schema().type().getName()).isEqualTo( "string" ); assertThat(valueSchema.field( "petObjects" ).schema().type().getName()).isEqualTo( "array" ); assertThat(valueSchema.field( "petObjects" ).schema().valueSchema().fields().size()).isEqualTo(2); assertThat(valueSchema.field( "petObjects" ).schema().valueSchema().type().getName()).isEqualTo( "struct" ); assertThat(valueSchema.field( "petObjects" ).schema().valueSchema().field( "type" ).schema().type().getName()).isEqualTo( "string" ); assertThat(valueSchema.field( "petObjects" ).schema().valueSchema().field( "colors" ).schema().type().getName()).isEqualTo( "array" ); assertThat(valueSchema.field( "petObjects" ).schema().valueSchema().field( "colors" ).schema().valueSchema().type().getName()).isEqualTo( "struct" ); assertThat(valueSchema.field( "petObjects" ).schema().valueSchema().field( "colors" ).schema().valueSchema().fields().size()).isEqualTo(1); assertThat(valueSchema.field( "petObjects" ).schema().valueSchema().field( "colors" ).schema().valueSchema().field( "name" ).schema().type().getName()).isEqualTo( "string" ); Struct valueStruct = (Struct) eventRouted.value(); assertThat(valueStruct.get( "fullName" )).isEqualTo( "John Doe" ); List< Object > petObjects = valueStruct.getArray( "petObjects" ); assertThat(petObjects.size()).isEqualTo(2); assertThat(asStruct(petObjects.get(0)).get( "type" )).isEqualTo( "dog" ); assertThat(asStruct(petObjects.get(0)).getArray( "colors" )).hasSize(0); assertThat(asStruct(petObjects.get(1)).get( "type" )).isEqualTo( "cat" ); assertThat(asStruct(petObjects.get(1)).getArray( "colors" )).hasSize(1); List< Object > colors = asStruct(petObjects.get(1)).getArray( "colors" ); assertThat(asStruct(colors.get(0)).get( "name" )).isEqualTo( "white" ); } and the test execution result: 2022-07-05 18:24:53,805 WARN   ||  JSON expansion failed   [io.debezium.transforms.outbox.EventRouterDelegate] org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{STRING}     at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53)     at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:40)     at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:73)     at io.debezium.transforms.outbox.StructBuilderUtil.getArrayAsList(StructBuilderUtil.java:83)     at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:71)     at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:44)     at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:73)     at io.debezium.transforms.outbox.StructBuilderUtil.getArrayAsList(StructBuilderUtil.java:83)     at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:71)     at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:44)     at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStruct(StructBuilderUtil.java:36)     at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:163)     at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:25)     at io.debezium.transforms.outbox.EventRouterTest.canExpandJsonWithNestedArraysWhereFirstArrayIsEmpty(EventRouterTest.java:963)     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78)     at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     at java.base/java.lang.reflect.Method.invoke(Method.java:567)     at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)     at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)     at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)     at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)     at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:258)     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)     at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)     at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)     at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)     at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)     at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)     at org.junit.runners.ParentRunner.run(ParentRunner.java:413)     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)     at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)     at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)     at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)     at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)     at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)     at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)org.junit.ComparisonFailure:  Expected : 'struct' Actual   : 'string'  

    Description

      What Debezium connector do you use and what version?

      Debezium MySQL Connector, 1.9.0 Final

      What is the connector configuration?

      config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
      config.providers=secretManager
      config.providers.secretManager.param.aws.region=eu-west-1
      key.converter=org.apache.kafka.connect.storage.StringConverter
      value.converter=io.debezium.converters.ByteBufferConverter
      value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
      value.converter.delegate.converter.type.schemas.enable=false
      reconnect.backoff.ms=5000
      reconnect.backoff.max.ms=60000
      producer.reconnect.backoff.max.ms=60000
      producer.reconnect.backoff.ms=5000
      producer.max.request.size=2097152
      retry.backoff.ms=5000
      tasks.max=1
      linger.ms=100
      offset.storage.topic=offsets-topic
      connector.class=io.debezium.connector.mysql.MySqlConnector
      database.dbname=db_name
      database.hostname=db-host
      database.include.list=db_name
      database.user=db_username
      database.password=db_pass
      database.port=3306
      database.server.name=server-name
      database.history.kafka.bootstrap.servers=msk-cluster-endpoints
      database.history.kafka.topic=dbhistory.inventory
      database.history.producer.security.protocol=SASL_SSL
      database.history.producer.sasl.mechanism=AWS_MSK_IAM
      database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
      database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
      database.history.consumer.security.protocol=SASL_SSL
      database.history.consumer.sasl.mechanism=AWS_MSK_IAM
      database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
      database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
      errors.log.enable=true
      errors.log.include.messages=true
      errors.tolerance=all
      snapshot.locking.mode=none
      snapshot.mode=schema_only
      snapshot.include.collection.list=db_name.outbox_event
      table.include.list=db_name.outbox_event
      tasks.max=1
      tombstones.on.delete=false
      topic.creation.default.replication.factor=3
      topic.creation.default.partitions=9
      topic.creation.default.cleanup.policy=compact
      topic.creation.default.compression.type=zstd
      topic.creation.enable=true
      transforms=outbox
      transforms.outbox.route.by.field=resource_type
      transforms.outbox.route.topic.replacement=core-${routedByValue}
      transforms.outbox.table.expand.json.payload=true
      transforms.outbox.table.field.event.timestamp=created_at
      transforms.outbox.table.field.event.id=event_id
      transforms.outbox.table.field.event.key=resource_id
      transforms.outbox.table.field.event.payload=payload
      transforms.outbox.type=io.debezium.transforms.outbox.EventRouter

      What is the captured database version and mode of depoyment?

      AWS Aurora MySQL

      What behaviour do you expect?

      Outbox JSON Payload to be expanded and message reached Kafka

      What behaviour do you see?

      Warning log for failed JSON Payload Expanding and error log for failed processing of the Outbox Event

      Do you see the same behaviour using the latest relesead Debezium version?

      I was able to replicate the problem with a unit test on the "main" branch of the Debezium codebase, so imo it affects the latest version as well

      Do you have the connector logs, ideally from start till finish?

      JSON expansion failed:

       

      2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] [2022-07-05 16:16:28,087] WARN [prod-us-core-debezium-connector|task-0] JSON expansion failed (io.debezium.transforms.outbox.EventRouterDelegate:166)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{STRING}2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:40)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:73)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getArrayAsList(StructBuilderUtil.java:83)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:71)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:44)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:73)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getArrayAsList(StructBuilderUtil.java:83)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:71)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:44)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:73)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:44)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStruct(StructBuilderUtil.java:36)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:163)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:25)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:341)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)2022-07-05T16:16:28.000Z    [Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)2022-07-05T16:16:28.000ZCopy
      [Worker-01d57e55e38601e5b]     at java.base/java.lang.Thread.run(Thread.java:829)
      [Worker-01d57e55e38601e5b] at java.base/java.lang.Thread.run(Thread.java:829) 

      followed by the error:

      org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class java.lang.String2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:713)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:596)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:346)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at io.debezium.converters.ByteBufferConverter.fromConnectData(ByteBufferConverter.java:70)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:316)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:316)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:342)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at java.base/java.lang.Thread.run(Thread.java:829) 

       

       

      How to reproduce the issue using our tutorial deployment?

      n/a - see "Steps to Reproduce"

      Implementation ideas (optional)

      I was thinking about changing behaviour in SchemaBuilderUtil, to not include schema for empty arrays as I don't see a way to modify/replace schemas that were already added for a given field (perhaps I am missing sth?). The downside of this proposed solution is that for events that only have empty arrays it wouldn't include them in the generated schema, neither it would include them in the event value

      Attachments

        Activity

          People

            Unassigned Unassigned
            paw.malon@gmail.com Pawel Malon (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: