Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5486

MySqlErrorHandler should handle SocketException

XMLWordPrintable

    • False
    • None
    • False

      What Debezium connector do you use and what version?

      v1.8.0.Final

      What is the connector configuration?

      MySQL

      What is the captured database version and mode of depoyment?

      Database version: MySQL 5.7

      Deployment mode: Standalone

      What behaviour do you expect?

      When I set connect.keep.alive is true. I want my task is alive when connection which read mysql binlog is closed by 

      java.net.SocketException: Connection reset

       

      What behaviour do you see?

      Task will not restart when socket is Exception by Connection reset and get the below error:

       

      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
          at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) 

       

       

      Do you see the same behaviour using the latest relesead Debezium version?

      On the Main branch. I don't see code update for dealing with this problem.

      Do you have the connector logs, ideally from start till finish?

       

      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
          at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
          at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1217)
          at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:980)
          at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
          at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
          at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: io.debezium.DebeziumException: Connection reset
          at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1172)
          ... 5 more
      Caused by: java.net.SocketException: Connection reset
          at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
          at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
          at com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream.read(BufferedSocketInputStream.java:51)
          at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readWithinBlockBoundaries(ByteArrayInputStream.java:226)
          at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.read(ByteArrayInputStream.java:207)
          at com.github.shyiko.mysql.binlog.io.ByteArrayInputStream.readInteger(ByteArrayInputStream.java:52)
          at com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java:35)
          at com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer.deserialize(EventHeaderV4Deserializer.java:27)
          at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:221)
          at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:230)
          at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:952)
          ... 3 more
      "DEFAULT_RETRIABLE_RESTART_WAITio/debezium/config/CommonConnectorConfig.javaelse if (throwable instanceof SocketException) {
          return true;
      }/src/main/java/io/debezium/connector/mysql/MySqlErrorHandler.java 

       

      Implementation ideas (optional)

      At `/src/main/java/io/debezium/connector/mysql/MySqlErrorHandler.java` change isRetriable function to 

          @Override    
          protected boolean isRetriable(Throwable throwable) {
              if (throwable instanceof SQLException) {
                  final SQLException sql = (SQLException) throwable;
                  return SQL_CODE_TOO_MANY_CONNECTIONS.equals(sql.getSQLState());
              }
              else if (throwable instanceof ServerException) {
                  final ServerException sql = (ServerException) throwable;
                  return SQL_CODE_TOO_MANY_CONNECTIONS.equals(sql.getSqlState());
              }
              else if (throwable instanceof SocketException) {
                  return true;
              }
              else if (throwable instanceof DebeziumException && throwable.getCause() != null) {
                  return isRetriable(throwable.getCause());
              }
              return false;
          }

              vjuranek@redhat.com Vojtech Juranek
              jingtaozhang18 jingtao zhang (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              6 Start watching this issue

                Created:
                Updated:
                Resolved: