Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-4939

MySQL connector increment snapshot failed parse datetime column lenth when connector set "snapshot.fetch.size": 20000

    XMLWordPrintable

Details

    • Hide
      • I already make initial snapshot and monitor some tables, the connector work normally.
      • One day, i want to add another table to monitor, i use signal increment snapshot, it will generate a error:
      [2022-04-02 10:28:49,630] ERROR Producer failure (io.debezium.pipeline.ErrorHandler)
      io.debezium.DebeziumException: Error processing binlog event
              at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)
              at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:860)
              at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)
              at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
              at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
              at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
              at java.base/java.lang.Thread.run(Thread.java:834)
      Caused by: java.lang.RuntimeException: Invalid length when read MySQL DATETIME value. BIN_LEN_DATETIME is 19
              at io.debezium.connector.mysql.MysqlBinaryProtocolFieldReader.readTimestampField(MysqlBinaryProtocolFieldReader.java:105)
              at io.debezium.connector.mysql.AbstractMysqlFieldReader.readField(AbstractMysqlFieldReader.java:50)
              at io.debezium.connector.mysql.MySqlConnection.getColumnValue(MySqlConnection.java:595)
              at io.debezium.jdbc.JdbcConnection.rowToArray(JdbcConnection.java:1503)
              at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.lambda$readChunk$2(AbstractIncrementalSnapshotChangeEventSource.java:278)
              at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:645)
              at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:513)
              at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:274)
              at io.debezium.connector.mysql.MySqlReadOnlyIncrementalSnapshotChangeEventSource.processTransactionCommittedEvent(MySqlReadOnlyIncrementalSnapshotChangeEventSource.java:175)
              at io.debezium.pipeline.EventDispatcher.dispatchTransactionCommittedEvent(EventDispatcher.java:277)
              at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleTransactionCompletion(MySqlStreamingChangeEventSource.java:602)
              at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$23(MySqlStreamingChangeEventSource.java:851)
              at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:349)
              ... 6 more
      [2022-04-02 10:28:49,630] INFO Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored. (io.debezium.connector.mysql.MySqlStreamingChangeEventSource)

      the code show that

      // MysqlBinaryProtocolFieldReader
      protected Object readTimestampField(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException {
        Blob b = rs.getBlob(columnIndex); 
        if (b == null)
        { 
            return null; // Don't continue parsing timestamp field if it is null 
        }
        else if (b.length() == 0) {
            LOGGER.warn("Encountered a zero length blob for column index {}", columnIndex); 
            return null;
        } 
        // if hour, minutes, seconds and micro_seconds are all 0, length is 4; if micro_seconds is 0, length is 7; otherwise length is 11 
        if (b.length() != NativeConstants.BIN_LEN_DATE && b.length() != NativeConstants.BIN_LEN_TIMESTAMP_NO_FRAC
             && b.length() != NativeConstants.BIN_LEN_TIMESTAMP_WITH_MICROS)
        { 
             throw new RuntimeException(String.format("Invalid length when read MySQL DATETIME value. BIN_LEN_DATETIME is %d", b.length())); 
         }
        ...
      }
      • the source code clearly shows that `b.length()` is 19.  this should not happen, because datetime string length is 19, but it byte lenth is not 19.
      • SO!!!  I guess this code may be parse column value length from binlog is error, may be use the datetime string length, but the datetime column schema  type length is 0.

      Another information:

      # connector some config 
      "snapshot.mode": "when_needed""snapshot.delay.ms": "5000", 
      "snapshot.locking.mode": "minimal""snapshot.fetch.size": 20000, 
      "read.only": "true""signal.kafka.topic": "debezium_signals_topic.usercenterdb.incremental""signal.kafka.bootstrap.servers": "xxx""signal.kafka.poll.timeout.ms": "1000""incremental.snapshot.allow.schema.changes": " true""incremental.snapshot.chunk.size": "102400",

       

      IMPORTANT!!!

      • when remove `snapshot.fetch.size` config from connector setting, it normally execute incremental snapshot finish, as the issue reporter says.

       

       

      Show
      I already make initial snapshot and monitor some tables, the connector work normally. One day, i want to add another table to monitor, i use signal increment snapshot, it will generate a error: [2022-04-02 10:28:49,630] ERROR Producer failure (io.debezium.pipeline.ErrorHandler) io.debezium.DebeziumException: Error processing binlog event         at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)         at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:860)         at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)         at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)         at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)         at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)         at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.RuntimeException: Invalid length when read MySQL DATETIME value. BIN_LEN_DATETIME is 19         at io.debezium.connector.mysql.MysqlBinaryProtocolFieldReader.readTimestampField(MysqlBinaryProtocolFieldReader.java:105)         at io.debezium.connector.mysql.AbstractMysqlFieldReader.readField(AbstractMysqlFieldReader.java:50)         at io.debezium.connector.mysql.MySqlConnection.getColumnValue(MySqlConnection.java:595)         at io.debezium.jdbc.JdbcConnection.rowToArray(JdbcConnection.java:1503)         at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.lambda$readChunk$2(AbstractIncrementalSnapshotChangeEventSource.java:278)         at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:645)         at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:513)         at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:274)         at io.debezium.connector.mysql.MySqlReadOnlyIncrementalSnapshotChangeEventSource.processTransactionCommittedEvent(MySqlReadOnlyIncrementalSnapshotChangeEventSource.java:175)         at io.debezium.pipeline.EventDispatcher.dispatchTransactionCommittedEvent(EventDispatcher.java:277)         at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleTransactionCompletion(MySqlStreamingChangeEventSource.java:602)         at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$23(MySqlStreamingChangeEventSource.java:851)         at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:349)         ... 6 more [2022-04-02 10:28:49,630] INFO Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored. (io.debezium.connector.mysql.MySqlStreamingChangeEventSource) the code show that // MysqlBinaryProtocolFieldReader protected Object readTimestampField(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException { Blob b = rs.getBlob(columnIndex); if (b == null ) { return null ; // Don't continue parsing timestamp field if it is null } else if (b.length() == 0) { LOGGER.warn( "Encountered a zero length blob for column index {}" , columnIndex); return null ; } // if hour, minutes, seconds and micro_seconds are all 0, length is 4; if micro_seconds is 0, length is 7; otherwise length is 11 if (b.length() != NativeConstants.BIN_LEN_DATE && b.length() != NativeConstants.BIN_LEN_TIMESTAMP_NO_FRAC && b.length() != NativeConstants.BIN_LEN_TIMESTAMP_WITH_MICROS) { throw new RuntimeException( String .format( "Invalid length when read MySQL DATETIME value. BIN_LEN_DATETIME is %d" , b.length())); } ... } the source code clearly shows that `b.length()` is 19.  this should not happen, because datetime string length is 19, but it byte lenth is not 19. SO!!!  I guess this code may be parse column value length from binlog is error, may be use the datetime string length, but the datetime column schema  type length is 0. Another information: # connector some config "snapshot.mode" : "when_needed" ,  "snapshot.delay.ms" : "5000" , "snapshot.locking.mode" : "minimal" ,  "snapshot.fetch.size" : 20000, "read.only" : " true " ,  "signal.kafka.topic" : "debezium_signals_topic.usercenterdb.incremental" ,  "signal.kafka.bootstrap.servers" : "xxx" ,  "signal.kafka.poll.timeout.ms" : "1000" ,  "incremental.snapshot.allow.schema.changes" : " true " ,  "incremental.snapshot.chunk.size" : "102400" ,   IMPORTANT!!! when remove `snapshot.fetch.size` config from connector setting, it normally execute incremental snapshot finish, as the issue reporter says.    

    Description

      MySQL connector increment snapshot failed parse datetime column length when connector set "snapshot.fetch.size": 20000 

      Attachments

        Issue Links

          Activity

            People

              vjuranek@redhat.com Vojtech Juranek
              season-1 Season P
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: