Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7250

Vitess-connector should not store GTID set in status topic

XMLWordPrintable

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      When the GTID set grows large (e.g., 2000 chars) for each task of a high number of tasks (64), then the request to update Task Status times out with this error:

       

      2023-12-12 00:37:25,887 ERROR  ||  Failed to write task configurations to Kafka   [org.apache.kafka.connect.storage.KafkaConfigBackingStore]java.util.concurrent.TimeoutException: Timeout after waiting for 210 ms.	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76)	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)	at org.apache.kafka.connect.storage.KafkaConfigBackingStore.sendPrivileged(KafkaConfigBackingStore.java:806)	at org.apache.kafka.connect.storage.KafkaConfigBackingStore.putTaskConfigs(KafkaConfigBackingStore.java:595)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$writeTaskConfigs$43(DistributedHerder.java:2070)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.writeToConfigTopicAsLeader(DistributedHerder.java:1556)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.writeTaskConfigs(DistributedHerder.java:2070)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2026)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2013)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:1956)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$39(DistributedHerder.java:1969)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2163)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:467)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:368)	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)	at java.base/java.lang.Thread.run(Thread.java:829)2023-12-12 00:37:25,887 ERROR  ||  [Worker clientId=connect-1, groupId=dbzium-prod-byuser] Failed to reconfigure connector's tasks (byuser-connector), retrying after backoff.   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]org.apache.kafka.connect.errors.ConnectException: Error writing task configurations to Kafka	at org.apache.kafka.connect.storage.KafkaConfigBackingStore.putTaskConfigs(KafkaConfigBackingStore.java:598)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$writeTaskConfigs$43(DistributedHerder.java:2070)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.writeToConfigTopicAsLeader(DistributedHerder.java:1556)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.writeTaskConfigs(DistributedHerder.java:2070)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2026)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2013)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:1956)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$39(DistributedHerder.java:1969)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2163)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:467)	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:368)	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)	at java.base/java.lang.Thread.run(Thread.java:829)Caused by: java.util.concurrent.TimeoutException: Timeout after waiting for 210 ms.	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76)	at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)	at org.apache.kafka.connect.storage.KafkaConfigBackingStore.sendPrivileged(KafkaConfigBackingStore.java:806)	at org.apache.kafka.connect.storage.KafkaConfigBackingStore.putTaskConfigs(KafkaConfigBackingStore.java:595) 

      We should not store the GTIDs in the statuses of tasks to prevent this synchronous call to update status from timing out. It needs to complete in 30 seconds which it seems its incapable of doing for these very large connector deploys that have been running a long time (thus have large GTID set). This is a result of reading offsets in VitessConnector rather than just doing them in VitessConnectorTask (which is how other connectors do this).

       

      Implementation ideas (optional)

      The MySQL connector only stores the configs, not the offsets in the task status. If we also do this (and have each task independently get its offset from the offset topic) we shouldn't run into these timeout errors.

            Unassigned Unassigned
            tthorn Thomas Thornton
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: