-
Bug
-
Resolution: Done
-
Minor
-
1.6.0.Final
-
None
-
False
-
False
-
Undefined
-
We have recently started getting an exception that is causing the task to stop in our postgres connector.
we were getting the following error from the API call checking on the status.
{"id":0,"state":"FAILED","worker_id":"0.0.0.0:8083","trace":"org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.\n\tat io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)\n\tat io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:153)\n\tat io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:38)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:159)\n\tat io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:122)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.postgresql.util.PSQLException: This connection has been closed.\n\tat org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:865)\n\tat org.postgresql.jdbc.PgConnection.createStatement(PgConnection.java:1764)\n\tat org.postgresql.jdbc.PgDatabaseMetaData.createMetaDataStatement(PgDatabaseMetaData.java:2663)\n\tat org.postgresql.jdbc.PgDatabaseMetaData.getPrimaryKeys(PgDatabaseMetaData.java:2156)\n\tat io.debezium.jdbc.JdbcConnection.readPrimaryKeyNames(JdbcConnection.java:1286)\n\tat io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.handleRelationMessage(PgOutputMessageDecoder.java:278)\n\tat io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:176)\n\tat io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:493)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:485)\n\tat io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:188)\n\tat io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:150)\n\t... 8 more\n"}
This was a little miss leading and had us scratching our heads for a bit but after digging through the logs a little further we compiled the error happened further up the stack.
2021-07-16 13:17:08,620 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics] 2021-07-16 13:17:08,620 INFO || App info kafka.producer for connector-producer-{connector_i_replaced} unregistered 2021-07-16 13:17:08,620 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics] 2021-07-16 13:17:08,616 INFO || Connection gracefully closed [io.debezium.jdbc.JdbcConnection] 2021-07-16 13:17:08,618 INFO || Connection gracefully closed [io.debezium.jdbc.JdbcConnection] 2021-07-16 13:17:08,618 INFO || [Producer clientId=connector-producer-{connector_i_replaced}] Closing the Kafka producer with timeoutMillis = 30000 ms. [org.apache.kafka.clients.producer.KafkaProducer] 2021-07-16 13:17:08,620 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics] at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:176) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at org.postgresql.jdbc.PgDatabaseMetaData.getPrimaryKeys(PgDatabaseMetaData.java:2156) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at org.postgresql.jdbc.PgDatabaseMetaData.createMetaDataStatement(PgDatabaseMetaData.java:2663) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:122) at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.handleRelationMessage(PgOutputMessageDecoder.java:278) at org.postgresql.jdbc.PgConnection.createStatement(PgConnection.java:1764) at java.base/java.lang.Thread.run(Thread.java:834) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:159) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:485) at io.debezium.jdbc.JdbcConnection.readPrimaryKeyNames(JdbcConnection.java:1286) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:188) 2021-07-16 13:17:08,615 INFO || Stopping down connector [io.debezium.connector.common.BaseSourceTask] ... 8 more Caused by: org.postgresql.util.PSQLException: This connection has been closed. at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:150) at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:865) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:493) 2021-07-16 13:17:08,591 INFO Postgres|flowable_dev_aht_rolenotification_a_master|streaming Connection gracefully closed [io.debezium.jdbc.JdbcConnection] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:38) 2021-07-16 13:17:08,614 INFO || WorkerSourceTask{id=AHTRoleNotification-a-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask] 2021-07-16 13:17:08,593 INFO Postgres|flowable_dev_aht_rolenotification_a_master|streaming Connection gracefully closed [io.debezium.jdbc.JdbcConnection] 2021-07-16 13:17:08,593 INFO Postgres|flowable_dev_aht_rolenotification_a_master|streaming Connected metrics set to 'false' [io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics] 2021-07-16 13:17:08,614 ERROR || WorkerSourceTask{id=AHTRoleNotification-a-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask] at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:153) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:159) 2021-07-16 13:17:08,614 INFO || WorkerSourceTask{id={connector_i_replaced}} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask] at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:122) 2021-07-16 13:17:08,593 INFO Postgres|{database_name_i_replaced}|streaming Finished streaming [io.debezium.pipeline.ChangeEventSourceCoordinator] at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:493) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:485) org.postgresql.util.PSQLException: This connection has been closed. at org.postgresql.jdbc.PgDatabaseMetaData.getPrimaryKeys(PgDatabaseMetaData.java:2156) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:188) at io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33) at io.debezium.jdbc.JdbcConnection.readPrimaryKeyNames(JdbcConnection.java:1286) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:38) 2021-07-16 13:17:08,589 ERROR Postgres|flowable_dev_aht_rolenotification_a_master|streaming Producer failure [io.debezium.pipeline.ErrorHandler] at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.handleRelationMessage(PgOutputMessageDecoder.java:278) at org.postgresql.jdbc.PgDatabaseMetaData.createMetaDataStatement(PgDatabaseMetaData.java:2663) at org.postgresql.jdbc.PgConnection.createStatement(PgConnection.java:1764) at io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:176) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:150) at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:865) 2021-07-16 13:17:08,589 WARN Postgres|{database_name_i_replaced}|streaming Failed to read column metadata for 'public.{tablename_i_replaced}' [io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder]
The warning is the true error:
2021-07-16 13:17:08,589 WARN Postgres|{database_name_i_replaced}|streaming Failed to read column metadata for 'public.{tablename_i_replaced}' [io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder]
This is a link to the WARN code log, which it is swallowing the SQL exception:
https://github.com/debezium/debezium/blob/6f9522068a83a37ccaac2070368977ce5c3692b0/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/pgoutput/PgOutputMessageDecoder.java#L340
1. I feel like we should not be swallowing the exception there, at minimum log out the exception.
2. with this swallowing the error it puts the connection into a disconnected state thus everything downstream will fail with a false positive error, why not throw the actual exception there?
3. this appears to be a temporary possibly transient issue (unable to reproduce), should there be some kind of retry logic put in place instead of just failing the task. as soon as we execute restart on the task it starts working fine. this failure has happened 1/2 dozen times in the last 2 weeks so we have literally written an app to check status and restart if in a failure state.