Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7845

Rewrite batch statement not supported for jdbc debezium sink

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Major Major
    • 2.7.0.Beta1
    • 2.7.0.Alpha1
    • jdbc-connector
    • None
    • False
    • None
    • False
    • Low

       

      Bug report

      For bug reports, provide this information, please: 

      What Debezium connector do you use and what version?

      debezium jdbc sink - V2.7.0.Alpha1

      What is the connector configuration?

      proper default confuguration with parameter `reWriteBatchedInserts` = true in jdbc url

      What is the captured database version and mode of depoyment?

      AWS Postgres RDS 14

      What behaviour do you expect?

      Batch Inserts

      What behaviour do you see?

      Single Single insert

      Do you see the same behaviour using the latest relesead Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      Yes

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      Hint: Ensure that no rows proposed for insertion within the same command have duplicate constrained values. Call getNextException to see other errors in the batch.

      at org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:186)
      at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:585)
      at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:896)
      at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:919)
      at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1685)
      at Simple.main(Test.java:32)
      Caused by: org.postgresql.util.PSQLException: ERROR: ON CONFLICT DO UPDATE command cannot affect row a second time
      Hint: Ensure that no rows proposed for insertion within the same command have duplicate constrained values.
      at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
      at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
      at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:574)
      

      How to reproduce the issue using our tutorial deployment?

      package
      
      // begin file: simple.java
      import java.sql.*;
      import java.util.Properties;
      //
      class Simple
      {
      public static void main(String[] args) throws SQLException {
      String url = "jdbc:postgresql://localhost:5432/test?ApplicationName=test_app";
      Properties properties = new Properties();
      properties.setProperty("user", "db_admin");
      properties.setProperty("password", "test");
      
      
       
      {{    properties.setProperty("reWriteBatchedInserts", "true");
          //
          try
          \{
              Connection connection = DriverManager.getConnection(url, properties);
              // -- the property reWriteBatchedInserts is set to true
              Statement statement = connection.createStatement();
              statement.execute("create table if not exists test(id int primary key, f1 text)");
              statement.execute("truncate table test");
              PreparedStatement prepared_statement = connection.prepareStatement(
                      "insert into test (id, f1) values (?, ?) ON CONFLICT (id) DO UPDATE SET f1=EXCLUDED.f1");
              prepared_statement.setInt(1, 1);
              prepared_statement.setString(2, "A");
              prepared_statement.addBatch();
              prepared_statement.setInt(1, 1);
              prepared_statement.setString(2, "B");
              prepared_statement.addBatch();
              int[] records_affected = prepared_statement.executeBatch();
              prepared_statement.close();
              statement.close();
              // --
              connection.close();
          }
          catch (SQLException e)
          \{
      
              throw  e;
          }
      

      Feature request or enhancement

      I'm working on it as discussed here in zulip thread: https://debezium.zulipchat.com/#narrow/stream/348249-community-postgresql/topic/debezium.20jdbc.20.20sink.20not.20working.20with.20reWriteBatchedInserts

      Which use case/requirement will be addressed by the proposed feature?

      Reduction buffer to reduce write load on db, configurable 

      Implementation ideas (optional)

      discussed in the thread 

            Unassigned Unassigned
            gaurav7261 Gaurav Miglani
            Gaurav Miglani
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: