Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1988

Add option to skip LSN timestamp queries




      Hi guys,

      First of all, I hope you are fine given the current circumstances

      Let me give you some context here guys to understand why we have created this request.

      We are using Debezium plugin to source data from a SQL Server database which CDC is already configured using Oracle cleanup jobs. We cannot use the default CDC cleanup jobs since we are also using Golden Gate from Oracle, which forces you to use their own cleanup jobs as described here.

      The issue comes when using Debezium + Oracle CDC cleanup jobs, since Oracle does not update the metadata tables containing the records' timestamps as frequently as the data tables. Debezium is using the result of [sys].[fn_cdc_map_lsn_to_time(?)] to know which specific table contains the timestamps of the events. Then, later on, it queries on every record that table to get the timestamp, as shown in the given code:

      public Instant timestampOfLsn(Lsn lsn) throws SQLException {
          if (lsn.getBinary() == null) {
              return null;
          Instant cachedInstant = lsnToInstantCache.get(lsn);
          if (cachedInstant != null) {
              return cachedInstant;
          return prepareQueryAndMap(lsnToTimestamp, statement -> {
              statement.setBytes(1, lsn.getBinary());
          }, singleResultMapper(rs -> {
              final Timestamp ts = rs.getTimestamp(1);
              Instant ret = (ts == null) ? null : normalize(ts);
              LOGGER.trace("Timestamp of lsn {} is {}", lsn, ret);
              if (ret != null) {
                  lsnToInstantCache.put(lsn, ret);
              return ret;
          }, "LSN to timestamp query must return exactly one value"));

      But! As mentioned above, Oracle CDC jobs won't update the metadata tables as frequently as data tables, so we sometimes receive NULL from this function since the commit LSN is still not written in the metadata table, causing a NullPointerException.

      Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, event_serial_no=1, commit_lsn=00153346:000811c8:0006, change_lsn=00153346:000811c8:0002}
          at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:191)
          at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:246)
          at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:525)
          at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:172)
          at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:162)
          ... 6 more
      Caused by: java.lang.NullPointerException
          at io.debezium.connector.AbstractSourceInfoStructMaker.commonStruct(AbstractSourceInfoStructMaker.java:53)
          at io.debezium.connector.sqlserver.SqlServerSourceInfoStructMaker.struct(SqlServerSourceInfoStructMaker.java:37)
          at io.debezium.connector.sqlserver.SqlServerSourceInfoStructMaker.struct(SqlServerSourceInfoStructMaker.java:14)
          at io.debezium.connector.AbstractSourceInfo.struct(AbstractSourceInfo.java:76)
          at io.debezium.connector.sqlserver.SqlServerOffsetContext.getSourceInfo(SqlServerOffsetContext.java:91)

      Besides all these above, we have our products deployed in multiple data centers, so this query being executed in every single record is a big bottleneck in our read throughput.

      Now, to avoid this issue, we have implemented a mechanism to skip this query which extracts the timestamp and returns an Instant.now() instead. The associated timestamp for a record is later on added in the Debezium source as ts_ms. Enabling it is just adding the configuration option "database.skip.lsn.timestamp.query": "true".

      Summarizing, the main points here are:

      • Changing the ts_ms semantic from where the data was inserted in the data CDC tables, to when the data was processed by the Debezium worker thread.
      • Skipping that query to avoid cross-data center latency. Mind that Debezium is making a query to the DB for each record it is processing. This was also a bottlenecked in the read throughput.

      Once we have implemented this, we managed to process up to 20K TPS

      Versions used:

      • SQL Server 2016 SP1
      • Debezium 1.1.0.Final
      • Confluent distribution 5.4.0

      I will create a pull request for this development so you can review it. Feel free to suggest any change or improvement should you have it.

      It would be really nice to have in place a mechanism to skip or ignore this query.

      Thanks in advance




            Unassigned Unassigned
            jantpedraza Juan Antonio Pedraza (Inactive)
            1 Vote for this issue
            4 Start watching this issue