-
Feature Request
-
Resolution: Done
-
Major
-
1.0.0.Final
-
None
From #debezium/dev:
Have question on retrying mechanism in case of error, precisely db connectivity error. Seems like its only possible when establishing replication connection through 'slot.max.reties' setting. Kafka-connect Introduced support for retry in https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect. In order for a task to be retried it must throw RetriableException. I'm killing database and debezium task is just stopped on poll:
connect_1 | 2020-01-15 13:25:50,173 ERROR || WorkerSourceTask{id=job-service-postgres-connector-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTask] connect_1 | org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. connect_1 | at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:171) connect_1 | at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151) connect_1 | at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:221) connect_1 | at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265) ... connect_1 | at java.base/java.lang.Thread.run(Thread.java:834) connect_1 | Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy connect_1 | at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1052) ... connect_1 | Caused by: java.net.SocketException: Broken pipe (Write failed) connect_1 | at java.base/java.net.SocketOutputStream.socketWrite0(Native Method) ...
Is this planned to be supported anytime soon or am I missing some other setting?
I got response that retrying is planned but without any dates set yet. I'm thinking on implementing this - have you thought already on any direction in which this can be made?
Basically from what I see in kafka-connect from plugin perspective to use retrying is to throw RetriableException instead of root exception. Debezium is a source plugin so I would add possibility to wrap exceptions thrown from poll() into RetriableException based on some configuration - does that make sense?
Possible look at https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/sink/JdbcSinkTask.java - possible showcase of reusing kafka-connect's retrying functionality.
- is duplicated by
-
DBZ-273 Connect doesn't reconnect to PostgreSQL if connection breaks
- Closed