Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6637

Received an unexpected message type that does not have an 'after' Debezium block

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Blocker Blocker
    • 2.4.0.Alpha2
    • 2.3.0.Final
    • jdbc-connector
    • None
    • Critical

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      io.debezium.connector.jdbc.JdbcSinkConnector v2.3

      What is the connector configuration?

      The issue I get is with the JDBC Sink connector. Here is my Sink config:

      {
          "name": "jdbc-connector",
          "config": {
              "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
              "tasks.max": "1",
              "connection.url": "jdbc:mysql://db-mysql/debezium_sink",
              "connection.username": "root",
              "connection.password": "debezium",
              "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
              "transforms.unwrap.drop.tombstones": "false",
              "transforms.unwrap.delete.handling.mode": "rewrite",
              "insert.mode": "upsert",
              "primary.key.mode": "record_key",
              "schema.evolution": "basic",
              "database.time_zone": "UTC",
              "topics.regex": "^aurora.*",
              "delete.enabled": "true",
              "name": "jdbc-connector"
          }
      }
      

      Here is my MySQL connector config, which works well reading the initial DB snapshot. v2.3

      {
          "name": "testMysqlConnector",
          "config": {
                  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
                  "tasks.max": "1",
                  "topic.prefix": "aurora",
                  "topic.creation.enable": "true",
                  "topic.creation.default.replication.factor": "1",
                  "topic.creation.default.partitions": "10",
                  "topic.creation.default.cleanup.policy": "compact",
                  "topic.creation.default.compression.type": "lz4" ,
                  "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
                  "schema.history.internal.kafka.topic": "schemahistory.aurora",
                  "include.schema.changes": "true",
                  "database.server.id": "12345",
                  "database.hostname": "localhost",
                  "database.user": "root",
                  "database.password": "password",
                  "database.history.kafka.bootstrap.servers": "kafka:9092",
                  "database.history.kafka.topic": "dbhistory.ffa",
                  "database.include.list": ".*stg_demo,.*stg_template",
                  "delete.enabled": "true",
                  "column.exclude.list": ".*geolocation.*,.*boundary.*,.*address.*",
                  "snapshot.mode": "initial",
                  "snapshot.delay.ms": "5000",
                  "transforms":"unwrap",
                  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
                  "transforms.unwrap.drop.tombstones": "false",
                  "transforms.unwrap.delete.handling.mode": "rewrite",
                  "transforms.unwrap.add.fields": "source.db"
              }
      }
      

      What is the captured database version and mode of deployment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      I have tried capturing data from local docker container using `quay.io/debezium/example-mysql:2.3` and also from AWS Aurora MySQL-compatible instance  v5.7 (2.11.2). Both scenarios work with MySQL connector as expected and the initial snapshot is loaded into kafka successfully. However, the JDBC sink connector barfs randomly half way through initial load of snapshot data from Kafka into MySQL or Postgres. I have tried Sinking data into both MySQL and Postgres separately and I get the same error, which is shown further below in this ticket. I tried running Debezium/Zookeeper/Kafka/Connect stack on my laptop using Docker compose and also separate Docker containers, as well as running the same thing on an AWS EC2 instance. I get the same error.

      What behaviour do you expect?

      I expect Debezium take an initial snapshot of 2 databases as indicated in the connector config. This part works well and I used Kafdrop to verify that all the topics and the data is loaded correctly.  I then expect the sink connector to read the messages from Kafka and insert data into my specified MySQL database, while automatically creating new tables that aren't already there. There should be approximately 200 tables in the sink DB when the process is finished.

      What behaviour do you see?

      The sink process starts off fine and about 50 tables get created in the sink DB with all the data as expected. However, after about 50 tables, I get an error and the process stops abruptly. There is no pattern where this process stops. It breaks sinking different tables each time I re-try.

      Do you see the same behaviour using the latest relesead Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      I have only tried using the latest stable release, which is currently v2.3.

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      Here is the part where it breaks. I have also attached a full trace in a file attachment.

      jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,204 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,209 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,215 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,222 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,229 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,235 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,241 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,247 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,253 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,259 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,265 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,272 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,278 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,284 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,290 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,299 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,305 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,310 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,318 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,324 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,330 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,336 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,344 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,352 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,359 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,365 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,371 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,377 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,383 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,389 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,410 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,418 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,427 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,438 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,468 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,495 INFO   ||  172.27.0.8 - - [06/Jul/2023:11:15:02 +0000] "GET /connectors HTTP/1.1" 200 39 "-" "Apache-HttpClient/4.5.13 (Java/11.0.16.1)" 5   [org.apache.kafka.connect.runtime.rest.RestServer]
      
      connect             | 2023-07-06 11:15:02,496 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,510 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,520 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,527 INFO   ||  172.27.0.8 - - [06/Jul/2023:11:15:02 +0000] "GET /connectors/testMysqlConnector HTTP/1.1" 200 1280 "-" "Apache-HttpClient/4.5.13 (Java/11.0.16.1)" 2   [org.apache.kafka.connect.runtime.rest.RestServer]
      
      connect             | 2023-07-06 11:15:02,532 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,537 INFO   ||  172.27.0.8 - - [06/Jul/2023:11:15:02 +0000] "GET /connectors/testMysqlConnector/status HTTP/1.1" 200 174 "-" "Apache-HttpClient/4.5.13 (Java/11.0.16.1)" 1   [org.apache.kafka.connect.runtime.rest.RestServer]
      
      connect             | 2023-07-06 11:15:02,543 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,550 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,550 INFO   ||  172.27.0.8 - - [06/Jul/2023:11:15:02 +0000] "GET /connectors/jdbc-connector HTTP/1.1" 200 649 "-" "Apache-HttpClient/4.5.13 (Java/11.0.16.1)" 2   [org.apache.kafka.connect.runtime.rest.RestServer]
      
      connect             | 2023-07-06 11:15:02,561 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,568 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,572 INFO   ||  172.27.0.8 - - [06/Jul/2023:11:15:02 +0000] "GET /connectors/jdbc-connector/status HTTP/1.1" 200 168 "-" "Apache-HttpClient/4.5.13 (Java/11.0.16.1)" 1   [org.apache.kafka.connect.runtime.rest.RestServer]
      
      connect             | 2023-07-06 11:15:02,575 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,588 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,598 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,605 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,611 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,617 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,622 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,628 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,634 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,642 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,648 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,655 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,661 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,667 INFO   ||  Marking processed record for topic aurora.stg_demo.ffa_user_rights   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | 2023-07-06 11:15:02,743 ERROR  ||  Failed to process record: Failed to process a sink record   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
      
      connect             | org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
      
      connect             |  at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:72)
      
      connect             |  at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:89)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
      
      connect             |  at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
      
      connect             |  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      
      connect             |  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      
      connect             |  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      
      connect             |  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      
      connect             |  at java.base/java.lang.Thread.run(Thread.java:829)
      
      connect             | Caused by: org.apache.kafka.connect.errors.ConnectException: Received an unexpected message type that does not have an 'after' Debezium block
      
      connect             |  at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.readSinkRecordNonKeyData(SinkRecordDescriptor.java:388)
      
      connect             |  at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:257)
      
      connect             |  at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:66)
      
      connect             |  ... 13 more
      
      connect             | 2023-07-06 11:15:02,827 ERROR  ||  WorkerSinkTask\{id=jdbc-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure   [org.apache.kafka.connect.runtime.WorkerSinkTask]
      
      connect             | org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
      
      connect             |  at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:80)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
      
      connect             |  at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
      
      connect             |  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      
      connect             |  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      
      connect             |  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      
      connect             |  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      
      connect             |  at java.base/java.lang.Thread.run(Thread.java:829)
      
      connect             | Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
      
      connect             |  at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:72)
      
      connect             |  at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:89)
      
      connect             |  ... 12 more
      
      connect             | Caused by: org.apache.kafka.connect.errors.ConnectException: Received an unexpected message type that does not have an 'after' Debezium block
      
      connect             |  at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.readSinkRecordNonKeyData(SinkRecordDescriptor.java:388)
      
      connect             |  at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:257)
      
      connect             |  at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:66)
      
      connect             |  ... 13 more
      
      connect             | 2023-07-06 11:15:02,857 ERROR  ||  WorkerSinkTask\{id=jdbc-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
      
      connect             | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
      
      connect             |  at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
      
      connect             |  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      
      connect             |  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      
      connect             |  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      
      connect             |  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      
      connect             |  at java.base/java.lang.Thread.run(Thread.java:829)
      
      connect             | Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
      
      connect             |  at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:80)
      
      connect             |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
      
      connect             |  ... 11 more
      
      connect             | Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
      
      connect             |  at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:72)
      
      connect             |  at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:89)
      
      connect             |  ... 12 more
      
      connect             | Caused by: org.apache.kafka.connect.errors.ConnectException: Received an unexpected message type that does not have an 'after' Debezium block
      
      connect             |  at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.readSinkRecordNonKeyData(SinkRecordDescriptor.java:388)
      
      connect             |  at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:257)
      
      connect             |  at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:66)
      
      connect             |  ... 13 more
      
      connect             | 2023-07-06 11:15:02,858 INFO   ||  Closing session.   [io.debezium.connector.jdbc.JdbcChangeEventSink]
      

      How to reproduce the issue using our tutorial deployment?

      Boot up the following docker compose file:

      version: '3'
      services: 
        zookeeper: 
          container_name: zookeeper
          image: quay.io/debezium/zookeeper:2.3
          networks: 
            - ui-network
        kafdrop: 
          image: obsidiandynamics/kafdrop
          restart: "no"
          ports: 
            - "9000:9000"
          environment: 
            KAFKA_BROKERCONNECT: "kafka:9092"
            JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
          depends_on: 
            - "kafka"
          networks: 
            - ui-network
        kafka: 
          container_name: kafka
          image: quay.io/debezium/kafka:2.3
          ports: 
            - "9092:9092"
          depends_on: 
            - zookeeper
          environment: 
            - ZOOKEEPER_CONNECT=zookeeper:2181
          networks: 
            - ui-network
        db-pg: 
          container_name: db-pg
          image: quay.io/debezium/example-postgres:2.3
          ports: 
            - "65432:5432"
          environment: 
            - POSTGRES_USER=postgres
            - POSTGRES_PASSWORD=postgres
          networks: 
            - ui-network
        db-mysql: 
          container_name: db-mysql
          image: quay.io/debezium/example-mysql:2.3
          ports: 
            - "63306:3306"
          environment: 
            - MYSQL_ROOT_PASSWORD=debezium
            - MYSQL_USER=debezium
            - MYSQL_PASSWORD=dbz
          networks: 
            - ui-network
        connect: 
          container_name: connect
          image: quay.io/debezium/connect:2.3
          ports: 
            - "8083:8083"
          depends_on: 
            - kafka
            - db-pg
          environment: 
            - BOOTSTRAP_SERVERS=kafka:9092
            - GROUP_ID=1
            - CONFIG_STORAGE_TOPIC=my_connect_configs
            - OFFSET_STORAGE_TOPIC=my_connect_offsets
            - STATUS_STORAGE_TOPIC=my_connect_statuses
            - ENABLE_DEBEZIUM_KC_REST_EXTENSION=true
            - ENABLE_DEBEZIUM_SCRIPTING=true
            - CONNECT_REST_EXTENSION_CLASSES=io.debezium.kcrestextension.DebeziumConnectRestExtension
          networks: 
            - ui-network
        debezium-ui: 
          container_name: debezium-ui
          image: quay.io/debezium/debezium-ui:latest
          ports: 
            - "8080:8080"
          environment: 
            - KAFKA_CONNECT_URIS=http://connect:8083
          depends_on: 
            - connect
          networks: 
            - ui-network
      networks: 
        ui-network: 
          external: false
      
      

      Add the connector config by posting to this endpoint `http://localhost:8083/connectors/testMysqlConnector`

       

      Wait for the snapshot and the data transfer to finish by checking kafka topics.

      Add Sink configuration `http://localhost:8083/connectors/jdbc-connector` using the config file as described earlier.

      The sink process will start and after some time, it will throw the 'org.apache.kafka.connect.errors.ConnectException: Received an unexpected message type that does not have an 'after' Debezium block` error.

      What does that error mean?

      '

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      Just need to understand what the error means and if there is a way to fix it.

      Implementation ideas (optional)

      Just need to understand what the error means and if there is a way to fix it. There is no documentation on the Internet anywhere about this type of error.

       

      We have a large project where Debezium would be the perfect technology to solve the problem, but we are blocked by this issue.

            rh-ee-mvitale Mario Fiore Vitale
            assrh Marko Tomic (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            5 Start watching this issue

              Created:
              Updated:
              Resolved: