Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-3068

PostgreSQL connector task fails to resume streaming because replication slot is active

XMLWordPrintable

    • Icon: Bug Bug
    • Resolution: Done
    • Icon: Minor Minor
    • 1.6.0.Beta2
    • 1.3.1.Final, 1.4.1.Final
    • postgresql-connector
    • None
    • False
    • False
    • Undefined
    • Hide

      Not reproduced for now - I'm still searching for another instance of PostgreSQL 9.6 which I can use for transactions and streaming test.

      Show
      Not reproduced for now - I'm still searching for another instance of PostgreSQL 9.6 which I can use for transactions and streaming test.

      On one of our databases connector have failed with simple connection error

      java.net.SocketException: Connection reset.
      

      but when we tried to restart it, task failed with error

      org.postgresql.util.PSQLException: ERROR: replication slot "sigmaedbstrm" is active for PID 30800
      

      At first I thought that database session somehow didn't end and now replication slot stuck in active state. We checked this with our DBA and no, when connector is stopped, then there is no sessions and replication slot isn't active. But when I restart connector, it throws exactly same error.

      After that I tried do completely delete and readd this connector to kafka connect deployment - it didn't help.

      And after that I've moved connector to separate kafka connect instance (transferring its offset in WAL too). And error about active slot is still present.

      Debug log for connector shows, that after start connector first opens replication connection just to find offset in WAL, then closes it and opens new connection for actual data replication:

      [2021-02-05 02:49:15,407] INFO Connected metrics set to 'true' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics)
      [2021-02-05 02:49:15,407] INFO Retrieved latest position from stored offset 'LSN{1456/75087D90}' (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource)
      [2021-02-05 02:49:15,407] INFO Starting streaming (io.debezium.pipeline.ChangeEventSourceCoordinator)
      [2021-02-05 02:49:15,408] INFO Looking for WAL restart position for last commit LSN 'LSN{1456/75087D90}' and last change LSN 'LSN{1456/75087D90}' (io.debezium.connector.postgresql.connection.WalPositionLocator)
      [2021-02-05 02:49:15,415] DEBUG Connected to jdbc:postgresql://172.20.9.44:5432/asfo with {user=sigma_debezium, server.name=sigmadb24h01, password=***, assumeMinServerVersion=9.4} (io.debezium.jdbc.JdbcConnection)
      [2021-02-05 02:49:15,418] INFO Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{1456/75087D90}, catalogXmin=509701087] (io.debezium.connector.postgresql.connection.PostgresConnection)
      [2021-02-05 02:49:15,424] DEBUG Connected to jdbc:postgresql://172.20.9.44:5432/asfo with {user=sigma_debezium, password=***, server.name=sigmadb24h01, replication=database, assumeMinServerVersion=9.4, preferQueryMode=simple} (io.debezium.jdbc.JdbcConnection)
      [2021-02-05 02:49:15,424] DEBUG running 'IDENTIFY_SYSTEM' to validate replication connection (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      [2021-02-05 02:49:15,426] DEBUG found previous flushed LSN 'LSN{1456/75087D90}' (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      [2021-02-05 02:49:15,426] DEBUG received latest xlogpos '1460/710F1558' (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      [2021-02-05 02:49:15,426] DEBUG starting streaming from LSN 'LSN{1456/75087D90}' (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      [2021-02-05 02:49:16,491] INFO Searching for WAL resume position (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource)
      [2021-02-05 02:51:33,330] INFO First LSN 'LSN{1453/FC7DC3C8}' received (io.debezium.connector.postgresql.connection.WalPositionLocator)
      [2021-02-05 02:53:18,957] INFO Received COMMIT LSN 'LSN{1456/76902E10}' larger than than last stored commit LSN 'LSN{1456/75087D90}' (io.debezium.connector.postgresql.connection.WalPositionLocator)
      [2021-02-05 02:53:18,958] INFO Will restart from LSN 'LSN{1453/FC7DC3C8}' that is start of the first unprocessed transaction (io.debezium.connector.postgresql.connection.WalPositionLocator)
      [2021-02-05 02:53:18,958] INFO WAL resume position 'LSN{1453/FC7DC3C8}' discovered (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource)
      [2021-02-05 02:53:18,958] DEBUG Closing replication connection (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      [2021-02-05 02:53:18,970] DEBUG Connected to jdbc:postgresql://172.20.9.44:5432/asfo with {user=sigma_debezium, password=***, server.name=sigmadb24h01, replication=database, assumeMinServerVersion=9.4, preferQueryMode=simple} (io.debezium.jdbc.JdbcConnection)
      [2021-02-05 02:53:18,970] DEBUG starting streaming from LSN 'LSN{1456/75087D90}' (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      [2021-02-05 02:53:18,974] DEBUG Could not register for streaming, retrying without optional options (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      org.postgresql.util.PSQLException: ERROR: replication slot "sigmaedbstrm" is active for PID 5661
      	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2532)
      	at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1200)
      	at org.postgresql.core.v3.QueryExecutorImpl.startCopy(QueryExecutorImpl.java:882)
      	at org.postgresql.core.v3.replication.V3ReplicationProtocol.initializeReplication(V3ReplicationProtocol.java:58)
      	at org.postgresql.core.v3.replication.V3ReplicationProtocol.startLogical(V3ReplicationProtocol.java:42)
      	at org.postgresql.replication.fluent.ReplicationStreamBuilder$1.start(ReplicationStreamBuilder.java:38)
      	at org.postgresql.replication.fluent.logical.LogicalStreamBuilder.start(LogicalStreamBuilder.java:37)
      	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startPgReplicationStream(PostgresReplicationConnection.java:569)
      	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationStream(PostgresReplicationConnection.java:391)
      	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:301)
      	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:143)
      	at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:140)
      	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:113)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      [2021-02-05 02:53:18,979] ERROR Producer failure (io.debezium.pipeline.ErrorHandler)
      io.debezium.DebeziumException: Failed to start replication stream at LSN{1456/75087D90}; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.
      	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:308)
      	at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:143)
      	at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:140)
      	at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:113)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:748)
      Caused by: org.postgresql.util.PSQLException: ERROR: replication slot "sigmaedbstrm" is active for PID 5661
      	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2532)
      	at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1200)
      	at org.postgresql.core.v3.QueryExecutorImpl.startCopy(QueryExecutorImpl.java:882)
      	at org.postgresql.core.v3.replication.V3ReplicationProtocol.initializeReplication(V3ReplicationProtocol.java:58)
      	at org.postgresql.core.v3.replication.V3ReplicationProtocol.startLogical(V3ReplicationProtocol.java:42)
      	at org.postgresql.replication.fluent.ReplicationStreamBuilder$1.start(ReplicationStreamBuilder.java:38)
      	at org.postgresql.replication.fluent.logical.LogicalStreamBuilder.start(LogicalStreamBuilder.java:37)
      	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startPgReplicationStream(PostgresReplicationConnection.java:569)
      	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationStream(PostgresReplicationConnection.java:406)
      	at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:301)
      	... 8 more
      

      And here is connector configuration:

      {
        "name": "SOURCE-SIGMA-ASFODB-DBZM",
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "slot.name": "sigmaedbstrm",
        "tasks.max": "1",
        "database.server.name": "sigmadb24h01",
        "database.hostname": "172.20.9.44",
        "database.port": "5432",
        "database.dbname": "asfo",
        "database.user": "sigma_debezium",
        "database.password": "***",
        "poll.interval.ms": "1000",
        "time.precision.mode": "connect",
        "heartbeat.interval.ms": "60000",
        "heartbeat.topics.prefix": "debezium-heartbeat",
        "plugin.name": "wal2json_streaming",
        "decimal.handling.mode": "string",
        "hstore.handling.mode": "json",
        "schema.registry.url": "http://ksr.contoso.com",
        "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://ksr.contoso.com",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://ksr.contoso.com",
        "max.queue.size": "20240",
        "max.batch.size": "10240",
        "transforms": "route",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
        "transforms.route.replacement": "sigma_asfodb_dbzm_$3",
        "snapshot.mode": "never",
        "table.include.list": "public.agency,public.transactions",
        "slot.stream.params": "add-tables=public.agency,public.transactions"
      }
      

      and it is cleaned from sensitive information, actual "table.include.list" contains 39 tables.

      This failure started with debezium connector version 1.3.1 on PostgreSQL server version 9.6.1.
      Then I updated debezium plugin to version 1.4.1 but it throws same error on start.

      Trying to reproduce problem I started connector with exact same configuration on exact same database, but with different replication slot name. And it correctly works everytime on simple restart and on remove/readd alike. For comparison here is log from working connector:

      [2021-02-05 07:37:57,130] INFO Retrieved latest position from stored offset 'LSN{1461/745F64B8}' (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource)
      [2021-02-05 07:37:57,130] INFO Starting streaming (io.debezium.pipeline.ChangeEventSourceCoordinator)
      [2021-02-05 07:37:57,130] INFO Connected metrics set to 'true' (io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics)
      [2021-02-05 07:37:57,130] INFO Looking for WAL restart position for last commit LSN 'LSN{1461/745F64B8}' and last change LSN 'LSN{1461/745F64B8}' (io.debezium.connector.postgresql.connection.WalPositionLocator)
      [2021-02-05 07:37:57,137] DEBUG Connected to jdbc:postgresql://172.20.9.44:5432/asfo with {user=sigma_debezium, server.name=sigmadb24h01, password=***, assumeMinServerVersion=9.4} (io.debezium.jdbc.JdbcConnection)
      [2021-02-05 07:37:57,140] INFO Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{1461/744967C0}, catalogXmin=516469900] (io.debezium.connector.postgresql.connection.PostgresConnection)
      [2021-02-05 07:37:57,147] DEBUG Connected to jdbc:postgresql://172.20.9.44:5432/asfo with {user=sigma_debezium, password=***, server.name=sigmadb24h01, replication=database, assumeMinServerVersion=9.4, preferQueryMode=simple} (io.debezium.jdbc.JdbcConnection)
      [2021-02-05 07:37:57,147] DEBUG running 'IDENTIFY_SYSTEM' to validate replication connection (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      [2021-02-05 07:37:57,148] DEBUG found previous flushed LSN 'LSN{1461/744967C0}' (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      [2021-02-05 07:37:57,148] DEBUG received latest xlogpos '1461/803DC000' (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      [2021-02-05 07:37:57,148] DEBUG starting streaming from LSN 'LSN{1461/745F64B8}' (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      [2021-02-05 07:37:57,948] INFO Searching for WAL resume position (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource)
      [2021-02-05 07:37:57,948] INFO First LSN 'LSN{1461/745F64B8}' received (io.debezium.connector.postgresql.connection.WalPositionLocator)
      [2021-02-05 07:37:57,949] INFO Received COMMIT LSN 'LSN{1461/745F86E8}' larger than than last stored commit LSN 'LSN{1461/745F64B8}' (io.debezium.connector.postgresql.connection.WalPositionLocator)
      [2021-02-05 07:37:57,949] INFO Will restart from LSN 'LSN{1461/745F86E8}' that follows the lastest stored (io.debezium.connector.postgresql.connection.WalPositionLocator)
      [2021-02-05 07:37:57,949] INFO WAL resume position 'LSN{1461/745F86E8}' discovered (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource)
      [2021-02-05 07:37:57,949] INFO LSN after last stored change LSN 'LSN{1461/745F86E8}' received (io.debezium.connector.postgresql.connection.WalPositionLocator)
      [2021-02-05 07:37:57,949] DEBUG Closing replication connection (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      [2021-02-05 07:37:57,962] DEBUG Connected to jdbc:postgresql://172.20.9.44:5432/asfo with {user=sigma_debezium, password=***, server.name=sigmadb24h01, replication=database, assumeMinServerVersion=9.4, preferQueryMode=simple} (io.debezium.jdbc.JdbcConnection)
      [2021-02-05 07:37:57,962] DEBUG starting streaming from LSN 'LSN{1461/745F64B8}' (io.debezium.connector.postgresql.connection.PostgresReplicationConnection)
      [2021-02-05 07:37:57,973] INFO Processing messages (io.debezium.connector.postgresql.PostgresStreamingChangeEventSource)
      [2021-02-05 07:37:57,974] INFO Streaming requested from LSN LSN{1461/745F64B8}, received LSN LSN{1461/745F64B8} identified as already processed (io.debezium.connector.postgresql.connection.AbstractMessageDecoder)
      [2021-02-05 07:37:57,974] DEBUG Message with LSN 'LSN{1461/745F64B8}' filtered (io.debezium.connector.postgresql.connection.WalPositionLocator)
      [2021-02-05 07:37:57,974] INFO Message with LSN 'LSN{1461/745F86E8}' arrived, switching off the filtering (io.debezium.connector.postgresql.connection.WalPositionLocator)
      

      I see difference in these logs:

      • working connector resume streaming pretty straighforward:
        [2021-02-05 07:37:57,949] INFO Will restart from LSN 'LSN{1461/745F86E8}' that follows the lastest stored (io.debezium.connector.postgresql.connection.WalPositionLocator)
        
      • failed connector tries to resume from transaction beginning
        [2021-02-05 02:53:18,958] INFO Will restart from LSN 'LSN{1453/FC7DC3C8}' that is start of the first unprocessed transaction (io.debezium.connector.postgresql.connection.WalPositionLocator)
        

      I guess there is some problem with WAL streaming resume in case of transaction, but I look at code in WalPositionLocator.java:L88 and don't see what it might be.

        1. thread_dump.txt
          51 kB
        2. image-2021-04-22-08-49-13-582.png
          image-2021-04-22-08-49-13-582.png
          287 kB
        3. debeziumLogsUpdated.txt
          551 kB
        4. debeziumLogs.txt
          603 kB
        5. DBZ-3068-1.png
          DBZ-3068-1.png
          26 kB

              Unassigned Unassigned
              ishipenkov Igor Shipenkov (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              11 Start watching this issue

                Created:
                Updated:
                Resolved: