Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2632

Allow closing of hung JDBC connection

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Done
    • Icon: Major Major
    • 1.4.0.Alpha1
    • 1.3.0.Final
    • core-library
    • None
    • False
    • False
    • Undefined

      I have the following SMT (for the sake of the test):

      public class Rogue <R extends ConnectRecord<R>> implements Transformation<R> {
      
        @Override
        public R apply (R record) {
          throw new DataException("some bad stuff happened");
        }
      
        @Override
        public ConfigDef config() {
          return new ConfigDef();
        }
      
        @Override
        public void configure(Map<String, ?> configs) { }
      
        @Override
        public void close() { }
      
      } 

      which throws an exception which results in the following log entries: 

      {
          "@timestamp": "2020-09-30T16:23:47.125Z",
          "source_host": "353bd2dc0aa4",
          "file": "WorkerTask.java",
          "method": "doRun",
          "level": "ERROR",
          "line_number": "188",
          "thread_name": "task-thread-source-icecap-staging-coredb-0",
          "@version": 1,
          "logger_name": "org.apache.kafka.connect.runtime.WorkerTask",
          "message": "WorkerSourceTask{id=source-icecap-staging-coredb-0} Task is being killed and will not recover until manually restarted",
          "class": "org.apache.kafka.connect.runtime.WorkerTask",
          "mdc": {
              "connector.context": "[source-icecap-staging-coredb|task-0] "
          }
      } 
          "exception": {
              "stacktrace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
      \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196)
      \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)
      \tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
      \tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:339)
      \tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
      \tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
      \tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
      \tat java.base\/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      \tat java.base\/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      \tat java.base\/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      \tat java.base\/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      \tat java.base\/java.lang.Thread.run(Thread.java:834)
      Caused by: org.apache.kafka.connect.errors.DataException: some bad stuff happened
      \tat com.strava.data.timber.kafka.connect.smt.Rogue.apply(Rogue.java:14)
      \tat org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
      \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
      \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
      \t... 11 more",
              "exception_class": "org.apache.kafka.connect.errors.ConnectException",
              "exception_message": "Tolerance exceeded in error handler"
          },
          "source_host": "41f5aba64925",
          "method": "doRun",
          "level": "ERROR",
          "message": "WorkerSourceTask{id=source-icecap-staging-coredb-0} Task threw an uncaught and unrecoverable exception",
          "mdc": {
              "connector.context": "[source-icecap-staging-coredb|task-0] "
          },
          "@timestamp": "2020-10-01T17:27:27.964Z",
          "file": "WorkerTask.java",
          "line_number": "187",
          "thread_name": "task-thread-source-icecap-staging-coredb-0",
          "@version": 1,
          "logger_name": "org.apache.kafka.connect.runtime.WorkerTask",
          "class": "org.apache.kafka.connect.runtime.WorkerTask"
      } 

      This is expected but when I actually go to the connector's status endpoint I'm getting:

      {
          "name": "source-icecap-staging-coredb",
          "connector": {
              "state": "RUNNING",
              "worker_id": "localhost:8083"
          },
          "tasks": [{
              "id": 0,
              "state": "RUNNING",
              "worker_id": "localhost:8083"
          }],
          "type": "source"
      } 

      I'm expecting the connector and/or task to be in a FAILED state.

      I'm using Debezium MySQL connector version 1.3.0.CR1

      A thread-dump revealed a potential dead lock preventing the thread from progressing when Kafka Connec calls the stop() method on the task:

      "task-thread-source-icecap-staging-coredb-0" #62 prio=5 os_prio=0 cpu=2628.13ms elapsed=29.57s tid=0x00007f2c10140000 nid=0x2ff runnable  [0x00007f2bb9beb000]
         java.lang.Thread.State: RUNNABLE
      	at java.net.SocketInputStream.socketRead0(java.base@11.0.8/Native Method)
      	at java.net.SocketInputStream.socketRead(java.base@11.0.8/SocketInputStream.java:115)
      	at java.net.SocketInputStream.read(java.base@11.0.8/SocketInputStream.java:168)
      	at java.net.SocketInputStream.read(java.base@11.0.8/SocketInputStream.java:140)
      	at com.mysql.cj.protocol.ReadAheadInputStream.fill(ReadAheadInputStream.java:107)
      	at com.mysql.cj.protocol.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:150)
      	at com.mysql.cj.protocol.ReadAheadInputStream.read(ReadAheadInputStream.java:180)
      	- locked <0x0000000081c7c148> (a com.mysql.cj.protocol.ReadAheadInputStream)
      	at java.io.FilterInputStream.read(java.base@11.0.8/FilterInputStream.java:133)
      	at com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64)
      	at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:108)
      	at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
      	at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:57)
      	at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
      	at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:61)
      	at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
      	at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75)
      	at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42)
      	at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1688)
      	at com.mysql.cj.protocol.a.result.ResultsetRowsStreaming.next(ResultsetRowsStreaming.java:202)
      	at com.mysql.cj.protocol.a.result.ResultsetRowsStreaming.close(ResultsetRowsStreaming.java:119)
      	- locked <0x0000000081c72908> (a com.mysql.cj.jdbc.ConnectionImpl)
      	at com.mysql.cj.jdbc.result.ResultSetImpl.realClose(ResultSetImpl.java:1882)
      	- locked <0x0000000081c72908> (a com.mysql.cj.jdbc.ConnectionImpl)
      	at com.mysql.cj.jdbc.result.ResultSetImpl.close(ResultSetImpl.java:529)
      	at com.mysql.cj.jdbc.StatementImpl.realClose(StatementImpl.java:1805)
      	at com.mysql.cj.jdbc.ConnectionImpl.closeAllOpenStatements(ConnectionImpl.java:744)
      	at com.mysql.cj.jdbc.ConnectionImpl.realClose(ConnectionImpl.java:1745)
      	at com.mysql.cj.jdbc.ConnectionImpl.close(ConnectionImpl.java:720)
      	- locked <0x0000000081c72908> (a com.mysql.cj.jdbc.ConnectionImpl)
      	at io.debezium.jdbc.JdbcConnection.close(JdbcConnection.java:913)
      	- locked <0x0000000081c61290> (a io.debezium.jdbc.JdbcConnection)
      	at io.debezium.connector.mysql.MySqlJdbcContext.shutdown(MySqlJdbcContext.java:151)
      	at io.debezium.connector.mysql.MySqlTaskContext.shutdown(MySqlTaskContext.java:259)
      	at io.debezium.connector.mysql.MySqlConnectorTask.completeReaders(MySqlConnectorTask.java:494)
      	at io.debezium.connector.mysql.MySqlConnectorTask$$Lambda$815/0x000000084074bc40.run(Unknown Source)
      	at io.debezium.connector.mysql.ChainedReader.readerCompletedPolling(ChainedReader.java:165)
      	- locked <0x0000000082161c68> (a io.debezium.connector.mysql.ChainedReader)
      	at io.debezium.connector.mysql.ChainedReader$$Lambda$814/0x000000084074b840.run(Unknown Source)
      	at io.debezium.connector.mysql.AbstractReader.cleanupResources(AbstractReader.java:309)
      	at io.debezium.connector.mysql.SnapshotReader.doStop(SnapshotReader.java:140)
      	at io.debezium.connector.mysql.AbstractReader.stop(AbstractReader.java:130)
      	at io.debezium.connector.mysql.ChainedReader.stop(ChainedReader.java:122)
      	- locked <0x0000000082161c68> (a io.debezium.connector.mysql.ChainedReader)
      	at io.debezium.connector.mysql.MySqlConnectorTask.doStop(MySqlConnectorTask.java:467)
      	at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:206)
      	at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:176)
      	at org.apache.kafka.connect.runtime.WorkerSourceTask.tryStop(WorkerSourceTask.java:220)
      	- locked <0x0000000081cbbd98> (a org.apache.kafka.connect.runtime.WorkerSourceTask)
      	at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:170)
      	at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:164)
      	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
      	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
      	at java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.8/Executors.java:515)
      	at java.util.concurrent.FutureTask.run(java.base@11.0.8/FutureTask.java:264)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.8/ThreadPoolExecutor.java:1128)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.8/ThreadPoolExecutor.java:628)
      	at java.lang.Thread.run(java.base@11.0.8/Thread.java:834)
      
      "SourceTaskOffsetCommitter-1" #63 prio=5 os_prio=0 cpu=0.09ms elapsed=29.57s tid=0x00007f2c1014c000 nid=0x300 waiting on condition  [0x00007f2bb9aeb000]
         java.lang.Thread.State: TIMED_WAITING (parking)
      	at jdk.internal.misc.Unsafe.park(java.base@11.0.8/Native Method)
      	- parking to wait for  <0x0000000081a48c80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
      	at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.8/LockSupport.java:234)
      	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(java.base@11.0.8/AbstractQueuedSynchronizer.java:2123)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(java.base@11.0.8/ScheduledThreadPoolExecutor.java:1182)
      	at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(java.base@11.0.8/ScheduledThreadPoolExecutor.java:899)
      	at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.8/ThreadPoolExecutor.java:1054)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.8/ThreadPoolExecutor.java:1114)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.8/ThreadPoolExecutor.java:628)
      	at java.lang.Thread.run(java.base@11.0.8/Thread.java:834)
      
      "debezium-mysqlconnector-staging_coredb-snapshot" #66 prio=5 os_prio=0 cpu=1860.91ms elapsed=28.58s tid=0x00007f2c1447c800 nid=0x303 waiting for monitor entry  [0x00007f2bb98e9000]
         java.lang.Thread.State: BLOCKED (on object monitor)
      	at com.mysql.cj.jdbc.result.ResultSetImpl.checkColumnBounds(ResultSetImpl.java:467)
      	- waiting to lock <0x0000000081c72908> (a com.mysql.cj.jdbc.ConnectionImpl)
      	at com.mysql.cj.jdbc.result.ResultSetImpl.getTimestamp(ResultSetImpl.java:954)
      	at io.debezium.connector.mysql.SnapshotReader.readTimestampField(SnapshotReader.java:227)
      	at io.debezium.connector.mysql.SnapshotReader.readField(SnapshotReader.java:161)
      	at io.debezium.connector.mysql.SnapshotReader.lambda$execute$14(SnapshotReader.java:653)
      	at io.debezium.connector.mysql.SnapshotReader$$Lambda$934/0x00000008408bb840.accept(Unknown Source)
      	at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:537)
      	at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:643)
      	at io.debezium.connector.mysql.SnapshotReader$$Lambda$818/0x000000084074ac40.run(Unknown Source)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.8/ThreadPoolExecutor.java:1128)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.8/ThreadPoolExecutor.java:628)
      	at java.lang.Thread.run(java.base@11.0.8/Thread.java:834) 

              Unassigned Unassigned
              creactiviti Arik Cohen (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: