Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7772

Debezium JDBC Sink error unique constraint violated

    XMLWordPrintable

Details

    • Bug
    • Resolution: Unresolved
    • Major
    • under-triaging
    • None
    • jdbc-connector
    • None
    • False
    • None
    • False

    Description

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      Hello everyone, I have a problem that I can’t solve, I have a jdbc-sink

      ///
      {
      "name": "business-acc",
      "config":

      { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:postgresql://host/my_db", "connection.username": "debezium_user", "connection.password": "debezium_pass", "insert.mode": "upsert", "delete.enabled": "true", "primary.key.mode": "record_key", "schema.evolution": "basic", "database.time_zone": "UTC", "topics": "bbusiness.public.accounts" }

      }

      \\\

      so it starts and works well but when it goes on for a while it crashes with an error

      ///
      The task was FAILED! Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record Caused by: org.hibernate.exception.JDBCConnectionException: error executing work Caused by: org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend. Caused by: java.net.SocketException: Connection timed out (Read failed)
      \\\

      Has anyone encountered such a problem? I checked many options but nothing helps): perhaps you have encountered such a problem or you have an idea on how to solve it, I will be very grateful for every answer

      What Debezium connector do you use and what version?

      debezium version 2.4

      jdbc-sink connector latest

      What is the connector configuration?

      {
      "name": "business-acc",
      "config":

      { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:postgresql://host/my_db", "connection.username": "debezium_user", "connection.password": "debezium_pass", "insert.mode": "upsert", "delete.enabled": "true", "primary.key.mode": "record_key", "schema.evolution": "basic", "database.time_zone": "UTC", "topics": "bbusiness.public.accounts" }

      }

      What is the captured database version and mode of depoyment?

      docker-compose and prod-openshift

      What behaviour do you expect?

      <Your answer>

      What behaviour do you see?

      <Your answer>

      Do you see the same behaviour using the latest relesead Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      <Your answer>

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      <Your answer>

      How to reproduce the issue using our tutorial deployment?

      <Your answer>

      Feature request or enhancement

      ERROR || WorkerSinkTask{id=business-acc-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure [org.apache.kafka.connect.runtime.WorkerSinkTask]
      org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
      at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:83)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
      at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
      at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:87)
      at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:94)
      ... 12 more
      Caused by: org.hibernate.exception.JDBCConnectionException: error executing work
      at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:98)
      at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:56)
      at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109)
      at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:95)
      at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:309)
      at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:962)
      at org.hibernate.internal.AbstractSharedSessionContract.doReturningWork(AbstractSharedSessionContract.java:958)
      at io.debezium.connector.jdbc.JdbcChangeEventSink.hasTable(JdbcChangeEventSink.java:142)
      at io.debezium.connector.jdbc.JdbcChangeEventSink.checkAndApplyTableChangesIfNeeded(JdbcChangeEventSink.java:111)
      at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:83)
      ... 13 more
      Caused by: org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
      at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:395)
      at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:498)
      at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:415)
      at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:335)
      at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:321)
      at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:297)
      at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:246)
      at org.postgresql.jdbc.PgDatabaseMetaData.getTables(PgDatabaseMetaData.java:1347)
      at com.mchange.v2.c3p0.impl.NewProxyDatabaseMetaData.getTables(NewProxyDatabaseMetaData.java:2938)
      at io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect.tableExists(GeneralDatabaseDialect.java:150)
      at io.debezium.connector.jdbc.dialect.postgres.PostgresDatabaseDialect.tableExists(PostgresDatabaseDialect.java:70)
      at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$hasTable$0(JdbcChangeEventSink.java:142)
      at org.hibernate.jdbc.WorkExecutor.executeReturningWork(WorkExecutor.java:55)
      at org.hibernate.internal.AbstractSharedSessionContract.lambda$doReturningWork$4(AbstractSharedSessionContract.java:954)
      at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:304)
      ... 18 more
      Caused by: java.net.SocketException: Connection timed out (Read failed)
      at java.base/java.net.SocketInputStream.socketRead0(Native Method)
      at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
      at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
      at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
      at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:161)
      at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:128)
      at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:113)
      at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:73)
      at org.postgresql.core.PGStream.receiveChar(PGStream.java:465)
      at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2155)
      at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
      ... 32 more
      2024-04-10 17:33:47,247 WARN || WorkerSinkTask{id=business-acc-0} Ignoring invalid task provided offset bbank.jobber.client_banner-0/OffsetAndMetadata{offset=822730, leaderEpoch=null, metadata=''} – not yet consumed, taskOffset=822730 currentOffset=822729 [org.apache.kafka.connect.runtime.WorkerSinkTask]
      2024-04-10 17:33:47,247 ERROR || WorkerSinkTask{id=business-acc-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
      org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
      at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
      at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
      at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:83)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
      ... 11 more
      Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
      at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:87)
      at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:94)

      Which use case/requirement will be addressed by the proposed feature?

      <Your answer>

      Implementation ideas (optional)

      The most interesting thing is that it works for some time and then crashes with such an error; in general, the volume of data is about 1-2 GB, which is processed before crashing. I tried to set large retraces, but the same problem persists, I think that it clogs the connection pool and does not close old connections. but that's my guess. On the database side, I also looked like there are no errors, everything works correctly.
       

      Attachments

        Activity

          People

            ccranfor@redhat.com Chris Cranford
            shah_nur_19 Shah Shamuratov
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated: