Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6039

Improve resilience during internal schema history recovery from Kafka

XMLWordPrintable

    • False
    • None
    • False

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      Debezium 2.1.0 for MySQL and SQL Server.

      What is the connector configuration?

      schema.history.internal.kafka.recovery.attempts: 8
      schema.history.internal.kafka.recovery.poll.interval.ms: 10000

      What is the captured database version and mode of depoyment?

      Irrelevant.

      What behaviour do you expect?

      Connectors restart without an error.

      What behaviour do you see?

      Sometimes, especially under high load on Kafka brokers and given large internal history topics, connectors fail with the following error:

      java.lang.IllegalStateException: The database history couldn't be recovered. Consider to increase the value for database.history.kafka.recovery.poll.interval.ms
      	at io.debezium.relational.history.KafkaDatabaseHistory.recoverRecords(KafkaDatabaseHistory.java:285)
      	at io.debezium.relational.history.AbstractDatabaseHistory.recover(AbstractDatabaseHistory.java:112)
      	at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:70)
      	at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:86)
      	at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:133)
      	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
      	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      	at java.base/java.lang.Thread.run(Thread.java:829)
      

      The current logic of keeping the attempt count even after successfully consuming records doesn’t look reasonable. If out of the 100 allowed retries, the first 95 fail due to the high broker load but then the load decreases, the client may start fetching records but it will have only 5 attempts remaining.

      Another bad feature of this logic is that given the same broker performance and the same maximum response size, the larger the topic is, the more likely its client will fail to restore it because it will need to make a larger number of requests to fetch all the topic messages.

      According to the documentation kafka.recovery.attempts is:

      The number of attempts in a row that no data are returned from Kafka before recover completes.

      But the current implementation counts the total attempts, not attempts in a row.

      Additionally, 4 documented as the default limit for recovery attempts is misleading. A user experiencing the error may want to increase the number of attempts by setting it to say 8. But in fact, this would decrease the limit and exacerbate the issue.

      Do you see the same behaviour using the latest relesead Debezium version?

      I haven't checked but most likely it's applicable.

      Do you have the connector logs, ideally from start till finish?

      No.

      How to reproduce the issue using our tutorial deployment?

      The issue is reproducible under high load on the Kafka cluster.

      Implementation ideas (optional)

      1. Correct the documentation on the default value of the restart attempt limit.
      2. Count only consecutive failures to consume messages against the limit.

              Unassigned Unassigned
              sergeimorozov Sergei Morozov (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: