Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-3829

Support for configuring binlog file and position to start CDC from

XMLWordPrintable

    • Icon: Feature Request Feature Request
    • Resolution: Unresolved
    • Icon: Blocker Blocker
    • 2.7-plan
    • 1.5.0.Final
    • mysql-connector
    • None
    • False
    • False
    • Undefined

      Please provide support for starting streaming from a binlog file/pos that can be passed as configuration parameters.

      Some findings when trying to set binlog file and position in memory or file to start reading from:

      Issue 1: 

      When starting CDC from an offset, snapshotting task assumes that a snapshot has been done previously. 

      I am setting an offset in memory and starting debezium process with snapshot_mode=schema_only

      The logs show that SnapshottingTask skips snapshotting the schema and thus the dbhistory file is empty. Log message:

      MySqlSnapshotChangeEventSource.getSnapshottingTask: A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.

      and hence when streaming data from offset it gives an error:

      MySqlStreamingChangeEventSource.informAboutUnknownTableIfRequired: Encountered change event {<event>} at offset {<supplied_offset>} for table {xyz} whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case

      Issue 2:

      I tried updating the code to not skip dbhistory creation if previous offset is present. 
      Unfortunately the dbhistory has schema capture for the current binlog position and not for the previous offset set in memory/file. 

      Hence, if there is a schema change between the previous offset given and current binlog position, we get an error like:

       [java] 2021-Aug-05 23:35:12.226 [WARN] (pool-2-thread-1) c.a.a.r.s.c.m.MysqlConnector.lambda$completionCallback$1: Stacktrace: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
           [java]  at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
           [java]  at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)
           [java]  at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130)
           [java]  at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978)
           [java]  at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
           [java]  at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
           [java]  at java.base/java.lang.Thread.run(Thread.java:829)
           [java] Caused by: io.debezium.DebeziumException: Error processing binlog event
           [java]  ... 6 more
           [java] Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {binlog_file_name=mysql-bin.000009, transaction_id=file=mysql-bin.000009,pos=107, ts_sec=1618814086, binlog_file_offset=107, row=1, server_id=1, event=2}
           [java]  at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:239)
           [java]  at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$3(MySqlStreamingChangeEventSource.java:682)
           [java]  at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:731)
           [java]  at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:681)
           [java]  at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)
           [java]  ... 5 more
           [java] Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
           [java]  at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
           [java]  at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
           [java]  at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
           [java]  at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:69)
           [java]  at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45)
           [java]  at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209)
           [java]  ... 9 more

      Hence, there should be schema information for the previous offset stored in dbhistory.

              anmohant Anisha Mohanty
              mansivora Mansi Vora (Inactive)
              Votes:
              3 Vote for this issue
              Watchers:
              10 Start watching this issue

                Created:
                Updated: