-
Bug
-
Resolution: Done
-
Major
-
1.9.0.Final
-
None
-
False
-
None
-
False
-
What Debezium connector do you use and what version?
Debezium MySQL Connector, 1.9.0 Final
What is the connector configuration?
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider
config.providers=secretManager
config.providers.secretManager.param.aws.region=eu-west-1
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.debezium.converters.ByteBufferConverter
value.converter.delegate.converter.type=org.apache.kafka.connect.json.JsonConverter
value.converter.delegate.converter.type.schemas.enable=false
reconnect.backoff.ms=5000
reconnect.backoff.max.ms=60000
producer.reconnect.backoff.max.ms=60000
producer.reconnect.backoff.ms=5000
producer.max.request.size=2097152
retry.backoff.ms=5000
tasks.max=1
linger.ms=100
offset.storage.topic=offsets-topic
connector.class=io.debezium.connector.mysql.MySqlConnector
database.dbname=db_name
database.hostname=db-host
database.include.list=db_name
database.user=db_username
database.password=db_pass
database.port=3306
database.server.name=server-name
database.history.kafka.bootstrap.servers=msk-cluster-endpoints
database.history.kafka.topic=dbhistory.inventory
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=all
snapshot.locking.mode=none
snapshot.mode=schema_only
snapshot.include.collection.list=db_name.outbox_event
table.include.list=db_name.outbox_event
tasks.max=1
tombstones.on.delete=false
topic.creation.default.replication.factor=3
topic.creation.default.partitions=9
topic.creation.default.cleanup.policy=compact
topic.creation.default.compression.type=zstd
topic.creation.enable=true
transforms=outbox
transforms.outbox.route.by.field=resource_type
transforms.outbox.route.topic.replacement=core-${routedByValue}
transforms.outbox.table.expand.json.payload=true
transforms.outbox.table.field.event.timestamp=created_at
transforms.outbox.table.field.event.id=event_id
transforms.outbox.table.field.event.key=resource_id
transforms.outbox.table.field.event.payload=payload
transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
What is the captured database version and mode of depoyment?
AWS Aurora MySQL
What behaviour do you expect?
Outbox JSON Payload to be expanded and message reached Kafka
What behaviour do you see?
Warning log for failed JSON Payload Expanding and error log for failed processing of the Outbox Event
Do you see the same behaviour using the latest relesead Debezium version?
I was able to replicate the problem with a unit test on the "main" branch of the Debezium codebase, so imo it affects the latest version as well
Do you have the connector logs, ideally from start till finish?
JSON expansion failed:
2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] [2022-07-05 16:16:28,087] WARN [prod-us-core-debezium-connector|task-0] JSON expansion failed (io.debezium.transforms.outbox.EventRouterDelegate:166)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{STRING}2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:40)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:73)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getArrayAsList(StructBuilderUtil.java:83)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:71)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:44)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:73)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getArrayAsList(StructBuilderUtil.java:83)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:71)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:44)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.getStructFieldValue(StructBuilderUtil.java:73)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStructInternal(StructBuilderUtil.java:44)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.StructBuilderUtil.jsonNodeToStruct(StructBuilderUtil.java:36)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.EventRouterDelegate.apply(EventRouterDelegate.java:163)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at io.debezium.transforms.outbox.EventRouter.apply(EventRouter.java:25)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:341)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)2022-07-05T16:16:28.000Z [Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)2022-07-05T16:16:28.000ZCopy [Worker-01d57e55e38601e5b] at java.base/java.lang.Thread.run(Thread.java:829) [Worker-01d57e55e38601e5b] at java.base/java.lang.Thread.run(Thread.java:829)
followed by the error:
org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class java.lang.String2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:713)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:596)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:346)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at io.debezium.converters.ByteBufferConverter.fromConnectData(ByteBufferConverter.java:70)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:316)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:316)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:342)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)2022-07-05T16:16:28.000Z[Worker-01d57e55e38601e5b] at java.base/java.lang.Thread.run(Thread.java:829)
How to reproduce the issue using our tutorial deployment?
n/a - see "Steps to Reproduce"
Implementation ideas (optional)
I was thinking about changing behaviour in SchemaBuilderUtil, to not include schema for empty arrays as I don't see a way to modify/replace schemas that were already added for a given field (perhaps I am missing sth?). The downside of this proposed solution is that for events that only have empty arrays it wouldn't include them in the generated schema, neither it would include them in the event value