-
Bug
-
Resolution: Obsolete
-
Blocker
-
None
-
1.0.0.Final
-
None
Hi,
I am connect to PostgreSQL 11.5 using Kafka connect and postgresql connector.
Deployed in production environment for a week, it runs very stable.
But in the second week something exception happened:
org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:171) at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151) at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:221) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.postgresql.util.PSQLException: Database connection failed when writing to copy at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1052) atorg.postgresql.core.v3.CopyDualImpl.flushCopy(CopyDualImpl.java:23) at org.postgresql.core.v3.replication.V3PGReplicationStream.updateStatusInternal(V3PGReplicationStream.java:193) at org.postgresql.core.v3.replication.V3PGReplicationStream.timeUpdateStatus(V3PGReplicationStream.java:184) at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:126) at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:80) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:397) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:119) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:99) ... 5 more Caused by: java.net.SocketException: Broken pipe (Write failed) at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:155) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at org.postgresql.core.PGStream.flush(PGStream.java:554) at org.postgresql.core.v3.QueryExecutorImpl.flushCopy(QueryExecutorImpl.java:1050) ... 13 more
This problem started, I restarted the connector and everything was recovered, but after about 2 hours, the exception still appeared.
In PostgreSQL, the configuration associated with connection timeouts is the default.
And the amount of data from the data source is not large, maybe a few pieces of data a day. and it was stable last week.
I don't know where to solve this problem, I also don't see any configuration related to database connection pooling.
Could it be that too little change data from the data source causes the connection to be idle and the connection to be closed?
Here is my connector configuration:
{ "name": "postgresql2kafka_ynztwl_tms_waybill", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "plugin.name": "pgoutput", "slot.name": "ynztwl", "publication.name": "ynztwl", "database.hostname": "xxxx", "database.port": "xxxx", "database.user": "xxxx", "database.password": "xxxx", "database.dbname" : "d1_scm5_prod_mirror", "database.server.name": "ynztwl", "table.whitelist": "public.tms_waybill", "column.blacklist": "public.tms_waybill.remark", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "drop" } }
Help! Thanks!