Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-965

PostgreSQL replication slots not updated in transactions

    • Hide

      1. Use the attached docker-compose file to bring up an environment (everything comes from debezium basic examples)
      2. Run the attached script that:

      • Creates a couple of tables ('data' and 'datahistory') in the postgres database
      • Registers a connector for the table 'data'
      • Periodically inserts rows in a transaction to 'data' and 'datahistory' (in that order)

      (keep it running for some long time)

      3. Check pg_replication_slots periodically. The replication slot for the connector never gets updated.

      4. To get the replication slot updated and confirm that the problem has to do with how changes in transactions are handled, you can perform some manual changes on the table 'data' (around every minute, the offsets are committed by the connector). Or just remove the transaction in the script. Or, even better, just change the order in which the tables are modified. If 'data' is changed at the end of the transaction, then it behaves.

      Show
      1. Use the attached docker-compose file to bring up an environment (everything comes from debezium basic examples) 2. Run the attached script that: Creates a couple of tables ('data' and 'datahistory') in the postgres database Registers a connector for the table 'data' Periodically inserts rows in a transaction to 'data' and 'datahistory' (in that order) (keep it running for some long time) 3. Check pg_replication_slots periodically. The replication slot for the connector never gets updated. 4. To get the replication slot updated and confirm that the problem has to do with how changes in transactions are handled, you can perform some manual changes on the table 'data' (around every minute, the offsets are committed by the connector). Or just remove the transaction in the script. Or, even better, just change the order in which the tables are modified. If 'data' is changed at the end of the transaction, then it behaves.

      Extended summary:

      When:

      • You setup a connector with one single table whitelisted
      • The table is always (and only) modified in a transaction
      • The transaction performs some other operations

      Then

      • The replication slot will never be updated

      In the context of a transaction, the connector can receive multiple change events. Only when the last change event of the transaction is processed the lastProcessedLsn value will be updated.

      https://github.com/debezium/debezium/blob/v0.8.3.Final/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java#L151

      But if the last change performed by the transaction affects a table that is not whitelisted then it won't even be processed, resulting in the same (old) lsn being commited once and again:

      https://github.com/debezium/debezium/blob/v0.8.3.Final/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java#L238

      (tableSchema will be null if the table is filtered out: https://github.com/debezium/debezium/blob/v0.8.3.Final/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java#L498)

      It took me a while to follow what was going on. I can provide some further explanations on why exactly this is happening but I hope it will be clear enough for anyone familiar with the code.

      I tried and failed to fix it myself and do a PR. I didn't find an easy way to do it without making major design changes (or minor super dirty hacks). But I'm totally new to this, so maybe someone else can fix it neatly in five minutes.

      Might (or might not) be related to:
      https://issues.jboss.org/projects/DBZ/issues/DBZ-926
      and
      https://issues.jboss.org/projects/DBZ/issues/DBZ-892

        1. correct-script.sh
          1 kB
        2. docker-compose.yml
          0.8 kB
        3. script.sh
          1 kB

            [DBZ-965] PostgreSQL replication slots not updated in transactions

            Released

            Jiri Pechanec added a comment - Released

            FTR, this only affects the "wal2json" LD plug-in, not the decoder-bufs one.

            Gunnar Morling added a comment - FTR, this only affects the "wal2json" LD plug-in, not the decoder-bufs one.

            Awesome. It worked perfectly

            Looking forward to seeing this merged and published

            Thank you very much. Impressive response time.

            Miguel Angel Martin Mendez (Inactive) added a comment - Awesome. It worked perfectly Looking forward to seeing this merged and published Thank you very much. Impressive response time.

            miguel.angel.martin Hi, I tried to prepare a correct fix that does not need heartbeats. Could you please giv it a try?

            Jiri Pechanec added a comment - miguel.angel.martin Hi, I tried to prepare a correct fix that does not need heartbeats. Could you please giv it a try?

            I tried the connector with kafka connect. It works as expected, but now we have some additional problems.

            In our connector we use some transforms and converts that now are also applied to the heartbeat records (resulting in null pointer exceptions).

            For example, we use
            ```
            "transforms.key.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
            "transforms.key.field":"uuid"
            "value.converter": "io.confluent.connect.avro.AvroConverter"
            ```
            What makes it end up throwing the following exception:

            ```
            2018-11-08 10:32:14,490 ERROR || WorkerSourceTask

            {id=adin-ad-stream-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTaskI]
            java.lang.NullPointerException
            at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61)
            at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
            at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
            at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
            2018-11-08 10:32:14,492 ERROR || WorkerSourceTask{id=adin-ad-stream-0}

            Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
            ```

            If I remove the transformations (that include setting a fixed topic name, although I'm pretty sure we can workaround that) it indeed sends the data to a topic, the heartbeat to another topic and it properly commits the lsn periodically to the replication slot.

            Unless there is a way to perform those operations conditionally in kafka connect, I'm afraid the fix will not work for us. I'll spend some time trying to figure out how to do it. Having a heartbeat would be anyway very useful for us in order to monitor the connector.

            Thanks again.

            Kind regards

            Miguel Angel Martin Mendez (Inactive) added a comment - I tried the connector with kafka connect. It works as expected, but now we have some additional problems. In our connector we use some transforms and converts that now are also applied to the heartbeat records (resulting in null pointer exceptions). For example, we use ``` "transforms.key.type":"org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.key.field":"uuid" "value.converter": "io.confluent.connect.avro.AvroConverter" ``` What makes it end up throwing the following exception: ``` 2018-11-08 10:32:14,490 ERROR || WorkerSourceTask {id=adin-ad-stream-0} Task threw an uncaught and unrecoverable exception [org.apache.kafka.connect.runtime.WorkerTaskI] java.lang.NullPointerException at org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:61) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2018-11-08 10:32:14,492 ERROR || WorkerSourceTask{id=adin-ad-stream-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask] ``` If I remove the transformations (that include setting a fixed topic name, although I'm pretty sure we can workaround that) it indeed sends the data to a topic, the heartbeat to another topic and it properly commits the lsn periodically to the replication slot. Unless there is a way to perform those operations conditionally in kafka connect, I'm afraid the fix will not work for us. I'll spend some time trying to figure out how to do it. Having a heartbeat would be anyway very useful for us in order to monitor the connector. Thanks again. Kind regards

            I'll try to build a kafka connect image from source and see how it works for us. In the meantime, I tried the fix using the embedded engine and it worked. The replication slots are updated now.
            The new thing is that the engine is now dispatching those heartbeat events, with a fixed key null value.
            I don't know exactly what the implications will be when the connector is used with Kafka connect. Will we need a topic for the heartbeat? We stream data to a cluster we don't own and we have some limitations on automatic topic creation.

            We are not in production yet so we could wait for a more proper fix (please, don't misunderstand me; it was absolutely awesome that you managed to have a working quick fix in a record time)

            I'll be back to you when (if) I manage to build my own kafka connect image with the modified postgres connector.

            Thank you very much.

            Miguel Angel Martin Mendez (Inactive) added a comment - - edited I'll try to build a kafka connect image from source and see how it works for us. In the meantime, I tried the fix using the embedded engine and it worked. The replication slots are updated now. The new thing is that the engine is now dispatching those heartbeat events, with a fixed key null value. I don't know exactly what the implications will be when the connector is used with Kafka connect. Will we need a topic for the heartbeat? We stream data to a cluster we don't own and we have some limitations on automatic topic creation. We are not in production yet so we could wait for a more proper fix (please, don't misunderstand me; it was absolutely awesome that you managed to have a working quick fix in a record time) I'll be back to you when (if) I manage to build my own kafka connect image with the modified postgres connector. Thank you very much.

            Are you able to build the connector from source? If yes please try https://github.com/debezium/debezium/pull/667 and set heartbeat.interval.ms to non-zero value. This should be-pass the problem.

            Jiri Pechanec added a comment - Are you able to build the connector from source? If yes please try https://github.com/debezium/debezium/pull/667 and set heartbeat.interval.ms to non-zero value. This should be-pass the problem.

            You can use the correct_script.sh I have just uploaded.
            Setting OFFSET_FLUSH_INTERVAL_MS=10000 made no difference for me (other than making it easier to check; thanks for the tip, btw)

            Miguel Angel Martin Mendez (Inactive) added a comment - You can use the correct_script.sh I have just uploaded. Setting OFFSET_FLUSH_INTERVAL_MS=10000 made no difference for me (other than making it easier to check; thanks for the tip, btw)

            Omg, I made a mistake when uploading the script. I'm very sorry about that. The 'data' table should not be updated at the end of the transaction to see the bug.

            Instead of:
            START TRANSACTION;
            INSERT INTO DATAHISTORY(value) values('some data');
            INSERT INTO DATA(value) values('some data');
            COMMIT;

            It should be:
            START TRANSACTION;
            INSERT INTO DATA(value) values('some data');
            INSERT INTO DATAHISTORY(value) values('some data');
            COMMIT;

            Miguel Angel Martin Mendez (Inactive) added a comment - Omg, I made a mistake when uploading the script. I'm very sorry about that. The 'data' table should not be updated at the end of the transaction to see the bug. Instead of: START TRANSACTION; INSERT INTO DATAHISTORY(value) values('some data'); INSERT INTO DATA(value) values('some data'); COMMIT; It should be: START TRANSACTION; INSERT INTO DATA(value) values('some data'); INSERT INTO DATAHISTORY(value) values('some data'); COMMIT;

            Hi, I am going through the code to make sure there is not a hidden problem. I tried your example exactly as described with one change to make the problem more visible. I set OFFSET_FLUSH_INTERVAL_MS=10000 for the Connect container.
            When I did this I see both restart_lsn and confirmed_flush_lsn regularly updated. Could you please verify if you see the same behaviour?

            Jiri Pechanec added a comment - Hi, I am going through the code to make sure there is not a hidden problem. I tried your example exactly as described with one change to make the problem more visible. I set OFFSET_FLUSH_INTERVAL_MS=10000 for the Connect container. When I did this I see both restart_lsn and confirmed_flush_lsn regularly updated. Could you please verify if you see the same behaviour?

              jpechane Jiri Pechanec
              miguel.angel.martin Miguel Angel Martin Mendez (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

                Created:
                Updated:
                Resolved: