• Icon: Feature Request Feature Request
    • Resolution: Done
    • Icon: Major Major
    • 1.2.0.Beta2
    • 1.1.0.Final
    • sqlserver-connector
    • None

      Hi guys,

      First of all, I hope you are fine given the current circumstances

      Let me give you some context here guys to understand why we have created this request.

      We are using Debezium plugin to source data from a SQL Server database which CDC is already configured using Oracle cleanup jobs. We cannot use the default CDC cleanup jobs since we are also using Golden Gate from Oracle, which forces you to use their own cleanup jobs as described here.

      The issue comes when using Debezium + Oracle CDC cleanup jobs, since Oracle does not update the metadata tables containing the records' timestamps as frequently as the data tables. Debezium is using the result of [sys].[fn_cdc_map_lsn_to_time(?)] to know which specific table contains the timestamps of the events. Then, later on, it queries on every record that table to get the timestamp, as shown in the given code:

      public Instant timestampOfLsn(Lsn lsn) throws SQLException {
          if (lsn.getBinary() == null) {
              return null;
          }
      
          Instant cachedInstant = lsnToInstantCache.get(lsn);
          if (cachedInstant != null) {
              return cachedInstant;
          }
      
          return prepareQueryAndMap(lsnToTimestamp, statement -> {
              statement.setBytes(1, lsn.getBinary());
          }, singleResultMapper(rs -> {
              final Timestamp ts = rs.getTimestamp(1);
              Instant ret = (ts == null) ? null : normalize(ts);
              LOGGER.trace("Timestamp of lsn {} is {}", lsn, ret);
              if (ret != null) {
                  lsnToInstantCache.put(lsn, ret);
              }
              return ret;
          }, "LSN to timestamp query must return exactly one value"));
      }
      

      But! As mentioned above, Oracle CDC jobs won't update the metadata tables as frequently as data tables, so we sometimes receive NULL from this function since the commit LSN is still not written in the metadata table, causing a NullPointerException.

      Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, event_serial_no=1, commit_lsn=00153346:000811c8:0006, change_lsn=00153346:000811c8:0002}
          at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:191)
          at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:246)
          at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:525)
          at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:172)
          at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:162)
          ... 6 more
      Caused by: java.lang.NullPointerException
          at io.debezium.connector.AbstractSourceInfoStructMaker.commonStruct(AbstractSourceInfoStructMaker.java:53)
          at io.debezium.connector.sqlserver.SqlServerSourceInfoStructMaker.struct(SqlServerSourceInfoStructMaker.java:37)
          at io.debezium.connector.sqlserver.SqlServerSourceInfoStructMaker.struct(SqlServerSourceInfoStructMaker.java:14)
          at io.debezium.connector.AbstractSourceInfo.struct(AbstractSourceInfo.java:76)
          at io.debezium.connector.sqlserver.SqlServerOffsetContext.getSourceInfo(SqlServerOffsetContext.java:91)
      

      Besides all these above, we have our products deployed in multiple data centers, so this query being executed in every single record is a big bottleneck in our read throughput.

      Now, to avoid this issue, we have implemented a mechanism to skip this query which extracts the timestamp and returns an Instant.now() instead. The associated timestamp for a record is later on added in the Debezium source as ts_ms. Enabling it is just adding the configuration option "database.skip.lsn.timestamp.query": "true".

      Summarizing, the main points here are:

      • Changing the ts_ms semantic from where the data was inserted in the data CDC tables, to when the data was processed by the Debezium worker thread.
      • Skipping that query to avoid cross-data center latency. Mind that Debezium is making a query to the DB for each record it is processing. This was also a bottlenecked in the read throughput.

      Once we have implemented this, we managed to process up to 20K TPS

      Versions used:

      • SQL Server 2016 SP1
      • Debezium 1.1.0.Final
      • Confluent distribution 5.4.0

      I will create a pull request for this development so you can review it. Feel free to suggest any change or improvement should you have it.

      It would be really nice to have in place a mechanism to skip or ignore this query.

      Thanks in advance
      @jantpedraza

            [DBZ-1988] Add option to skip LSN timestamp queries

            gunnar.morling, finally I pushed all the changes we agreed. Regarding your last question about the query to extract the ts_ms per transaction, yes, you are right. My feeling was that the query was run per record due to the cross-datacenter latency we had

            Juan Antonio Pedraza (Inactive) added a comment - - edited gunnar.morling , finally I pushed all the changes we agreed. Regarding your last question about the query to extract the ts_ms per transaction , yes, you are right. My feeling was that the query was run per record due to the cross-datacenter latency we had

            Gunnar Morling added a comment - - edited

            jantpedraza1, thanks a lot for clarifying and the thorough issue description in general. I was a bit in a hurry yesterday, but it makes a lot more sense to me now. Thanks! And hoping you're sound and safe, too! A few comments:

            • The NPE should be avoided in any case, also without enabling the new option you introduced

            Changing the ts_ms semantic from where the data was inserted in the data CDC tables, to when the data was processed by the Debezium worker thread.

            • Change events already contain the timestamp of processing within Debezium (top-level field ts_ms); ideally we'd use that very same value in this particular case also for source.ts_ms, to avoid a small diff due to multiple invocations of now() at different places in the call stack. It might be challenging though to obtain that value at the place in the code where we build the source struct

            Mind that Debezium is making a query to the DB for each record it is processing.

            • That's interesting; that query should be run once per transaction. So unless you have have many small TX with a single change each, you shouldn't see this query for each record. Could you double-check whether that's what you observe?

            I'll also put a few comments into the PR for some specifig feedback on the proposed change.

            Gunnar Morling added a comment - - edited jantpedraza1 , thanks a lot for clarifying and the thorough issue description in general. I was a bit in a hurry yesterday, but it makes a lot more sense to me now. Thanks! And hoping you're sound and safe, too! A few comments: The NPE should be avoided in any case, also without enabling the new option you introduced Changing the ts_ms semantic from where the data was inserted in the data CDC tables, to when the data was processed by the Debezium worker thread. Change events already contain the timestamp of processing within Debezium (top-level field ts_ms ); ideally we'd use that very same value in this particular case also for source.ts_ms , to avoid a small diff due to multiple invocations of now() at different places in the call stack. It might be challenging though to obtain that value at the place in the code where we build the source struct Mind that Debezium is making a query to the DB for each record it is processing. That's interesting; that query should be run once per transaction . So unless you have have many small TX with a single change each, you shouldn't see this query for each record. Could you double-check whether that's what you observe? I'll also put a few comments into the PR for some specifig feedback on the proposed change.

            Hi Gunnar, this is about the Debezium SQL Server Connector itself, but when it is being used with the Oracle Golden Gate cleanup jobs.

            Juan Antonio Pedraza (Inactive) added a comment - - edited Hi Gunnar, this is about the Debezium SQL Server Connector itself, but when it is being used with the Oracle Golden Gate cleanup jobs.

            jantpedraza1, could you clarify whether this is about SQL Server or Oracle? I'm confused

            Gunnar Morling added a comment - jantpedraza1 , could you clarify whether this is about SQL Server or Oracle? I'm confused

              Unassigned Unassigned
              jantpedraza Juan Antonio Pedraza (Inactive)
              Votes:
              1 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: