Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8247

JDBC Sink truncate event also add event to updateBufferByTable

XMLWordPrintable

    • False
    • None
    • False
    • Important

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      2.7.0.Final, but the same I see in last versions of master https://github.com/debezium/debezium/blob/33be8c03d724fc0b68f2f9b47354f1ecff65ef95/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java#L42

      What is the connector configuration?

      ```
      worker.connector.insert.mode=upsert
      worker.connector.truncate.enabled=true
      worker.connector.delete.enabled=false
      worker.connector.primary.key.mode=record_key
      worker.connector.auto.create=true
      worker.connector.connection.password=iot
      worker.connector.connection.url=jdbc:postgresql://localhost:5432/iot
      worker.connector.connection.username=iot
      worker.connector.connector.class=io.debezium.connector.jdbc.JdbcSinkConnector
      worker.connector.database.time.zone=UTC
      worker.connector.name=JdbcSinkConnector
      worker.connector.quote.identifiers=true
      worker.connector.schema.evolution=basic
      worker.connector.tasks.max=1
      worker.connector.key.converter=org.apache.kafka.connect.json.JsonConverter
      worker.connector.value.converter=org.apache.kafka.connect.json.JsonConverter
      ```

      What is the captured database version and mode of deployment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      It doesn't matter, but PG 16.2-1.pgdg120+2

      What behavior do you expect?

      I expect only one SQL operation to target database from truncate event operation.

      What behavior do you see?

      Got 2 SQL operations. One is truncate https://github.com/debezium/debezium/blob/33be8c03d724fc0b68f2f9b47354f1ecff65ef95/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java#L106-L112

      and it works fine. But then after truncate I'll got exception. Because after truncate condition branch, connector doesn't stop investigate the current record and step into next condition statement. And finally it steps here https://github.com/debezium/debezium/blob/33be8c03d724fc0b68f2f9b47354f1ecff65ef95/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java#L135-L154

      and execute update event.

      Do you see the same behaviour using the latest released Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      YES, the same behavior in master https://github.com/debezium/debezium/blob/33be8c03d724fc0b68f2f9b47354f1ecff65ef95/debezium-connector-jdbc/src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java#L42

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      ```
      Failed to process record: Failed to process a sink record
       
      org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
      at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:229) ~[debezium-connector-jdbc-2.7.0.Final.jar:2.7.0.Final]
      at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:207) ~[debezium-connector-jdbc-2.7.0.Final.jar:2.7.0.Final]
      at java.base/java.util.HashMap.forEach(HashMap.java:1429) ~[na:na]
      at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:207) ~[debezium-connector-jdbc-2.7.0.Final.jar:2.7.0.Final]
      at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:159) ~[debezium-connector-jdbc-2.7.0.Final.jar:2.7.0.Final]
      at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103) ~[debezium-connector-jdbc-2.7.0.Final.jar:2.7.0.Final]
      at io.tarantool.worker.sink.SinkWorkerConsumer.putEventsToTask(SinkWorkerConsumer.java:171) ~[classes/:na]
      at io.tarantool.worker.sink.SinkWorkerConsumer.runConsuming(SinkWorkerConsumer.java:128) ~[classes/:na]
      at io.tarantool.worker.sink.SinkWorkerConsumer.run(SinkWorkerConsumer.java:105) ~[classes/:na]
      at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
      Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot write to table User with no key fields defined.
      at io.debezium.connector.jdbc.JdbcChangeEventSink.getSqlStatement(JdbcChangeEventSink.java:374) ~[debezium-connector-jdbc-2.7.0.Final.jar:2.7.0.Final]
      at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:220) ~[debezium-connector-jdbc-2.7.0.Final.jar:2.7.0.Final]
      ... 9 common frames omitted
      ```

      How to reproduce the issue using our tutorial deployment?

      Run any sourceConnector for any database. For example I have Postgres debezium connector and then I create this query in PG:

      ```

      truncate table "Blabla";

      ```

      After it we have this truncate event in Kafka. 

      Then run JDBC sink connector that get this operation = "t" event and got error like this 

      ```

      Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot write to table User with no key fields defined.

      ```

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Implementation ideas (optional)

      Simple need to add continue statement after evaluation of truncate execution

            rk3rn3r René Kerner
            artdub Artyom Dubinin
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated: