Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7291

Retry flush records if LockAcquisitionException occured in mysql

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Done
    • Icon: Minor Minor
    • 3.0.0.CR2
    • None
    • jdbc-connector
    • None

      debezium-connector-jdbc

      it seems to no logic for retry when fail to write records to database using jdbc.

      in case of mysql sink, LockAcquisitionException or PessimisticLockException{} is sometimes thrown by other transactions.

      we can resolve the exception by restarting sink task and it's only way to solve.

      so I think we need to retry flush(write) records options and it would help for other exception cases.

      here pseudo code in JdbcChangeEventSink using two options.

       

      flush.failure.max.retries
      flush.failure.retries.wait.ms

       

       

      In JdbcChangeEventSink

      private void flushBufferRetriable(TableId tableId, List<SinkRecordDescriptor> toFlush) {
          int retries = 0;
          Exception lastException = null;
      
          LOGGER.debug("Flushing records in JDBC Writer for table: {}", tableId.getTableName());
          while (retries <= flushFailureMaxRetries) {
              try {
                  if (retries > 0) {
                      LOGGER.warn("Retry to flush records for table '{}'. retry times {}/{} with delay {} ms",
                              tableId.getTableName(), retries, flushFailureMaxRetries, flushFailureRetriesWait.toMillis());
                      try {
                          Metronome.parker(flushFailureRetriesWait, Clock.SYSTEM).pause();
                      }
                      catch (InterruptedException ie) {
                          throw new ConnectException("Interrupted while wait to retry flush records", ie);
                      }
                  }
                  flushBuffer(tableId, toFlush);
                  return;
              }
              catch (Exception e) {
                  lastException = e;
                  if (isRetriable(e)) {
                      retries++;
                  }
                  else {
                      throw new ConnectException("Failed to process sink records", e);
                  }
              }
          }
          throw new ConnectException("Exceeded max retries " + flushFailureMaxRetries + " times, failed to process sink records", lastException);
      } 
      
      private boolean isRetriable(Throwable throwable) {
          if (throwable == null) {
              return false;
          }
          for (Class<? extends Exception> e : dialect.communicationExceptions()) {
              if (e.isAssignableFrom(throwable.getClass())) {
                  return true;
              }
          }
          return isRetriable(throwable.getCause());
      }

       

       

      This is an example of LockAcquisitionException or PessimisticLockException{} I encountered in mysql sink.

      • Lock Timeout (PessimisticLockException)

      if a transaction is ongoing long running query with sink table S lock, sink task is failed to upsert by lock wait timeout.

      • Deadlock Detection (LockAcquisitionException)
      1. a transaction has gap S lock of a index.
      2. sink task has X lock for PRIMARY index and
        waits for X lock for other index locked by (1) transaction because the value of index is updated into the gap.
      3. (1) transaction requests X lock for PRIAMRY index.
      4. if (1) transaction has more DML to rollback than sink task, sink task is failed with deadlock.

       

       

      if it looks reasonable, I will make a PR for that.

              Unassigned Unassigned
              hwang031451 Inki Hwang (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: