-
Feature Request
-
Resolution: Done
-
Major
-
None
-
False
-
None
-
False
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
When the GTID set grows large (e.g., 2000 chars) for each task of a high number of tasks (64), then the request to update Task Status times out with this error:
2023-12-12 00:37:25,887 ERROR || Failed to write task configurations to Kafka [org.apache.kafka.connect.storage.KafkaConfigBackingStore]java.util.concurrent.TimeoutException: Timeout after waiting for 210 ms. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.sendPrivileged(KafkaConfigBackingStore.java:806) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.putTaskConfigs(KafkaConfigBackingStore.java:595) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$writeTaskConfigs$43(DistributedHerder.java:2070) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.writeToConfigTopicAsLeader(DistributedHerder.java:1556) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.writeTaskConfigs(DistributedHerder.java:2070) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2026) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2013) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:1956) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$39(DistributedHerder.java:1969) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2163) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:467) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:368) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)2023-12-12 00:37:25,887 ERROR || [Worker clientId=connect-1, groupId=dbzium-prod-byuser] Failed to reconfigure connector's tasks (byuser-connector), retrying after backoff. [org.apache.kafka.connect.runtime.distributed.DistributedHerder]org.apache.kafka.connect.errors.ConnectException: Error writing task configurations to Kafka at org.apache.kafka.connect.storage.KafkaConfigBackingStore.putTaskConfigs(KafkaConfigBackingStore.java:598) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$writeTaskConfigs$43(DistributedHerder.java:2070) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.writeToConfigTopicAsLeader(DistributedHerder.java:1556) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.writeTaskConfigs(DistributedHerder.java:2070) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2026) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2013) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:1956) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$39(DistributedHerder.java:1969) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2163) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:467) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:368) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)Caused by: java.util.concurrent.TimeoutException: Timeout after waiting for 210 ms. at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:76) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.sendPrivileged(KafkaConfigBackingStore.java:806) at org.apache.kafka.connect.storage.KafkaConfigBackingStore.putTaskConfigs(KafkaConfigBackingStore.java:595)
We should not store the GTIDs in the statuses of tasks to prevent this synchronous call to update status from timing out. It needs to complete in 30 seconds which it seems its incapable of doing for these very large connector deploys that have been running a long time (thus have large GTID set). This is a result of reading offsets in VitessConnector rather than just doing them in VitessConnectorTask (which is how other connectors do this).
Implementation ideas (optional)
The MySQL connector only stores the configs, not the offsets in the task status. If we also do this (and have each task independently get its offset from the offset topic) we shouldn't run into these timeout errors.