-
Enhancement
-
Resolution: Done
-
Major
-
1.3.0.Final
-
None
-
False
-
False
-
Undefined
-
I have the following SMT (for the sake of the test):
public class Rogue <R extends ConnectRecord<R>> implements Transformation<R> { @Override public R apply (R record) { throw new DataException("some bad stuff happened"); } @Override public ConfigDef config() { return new ConfigDef(); } @Override public void configure(Map<String, ?> configs) { } @Override public void close() { } }
which throws an exception which results in the following log entries:
{ "@timestamp": "2020-09-30T16:23:47.125Z", "source_host": "353bd2dc0aa4", "file": "WorkerTask.java", "method": "doRun", "level": "ERROR", "line_number": "188", "thread_name": "task-thread-source-icecap-staging-coredb-0", "@version": 1, "logger_name": "org.apache.kafka.connect.runtime.WorkerTask", "message": "WorkerSourceTask{id=source-icecap-staging-coredb-0} Task is being killed and will not recover until manually restarted", "class": "org.apache.kafka.connect.runtime.WorkerTask", "mdc": { "connector.context": "[source-icecap-staging-coredb|task-0] " } }
"exception": { "stacktrace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122) \tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) \tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:339) \tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264) \tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) \tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) \tat java.base\/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) \tat java.base\/java.util.concurrent.FutureTask.run(FutureTask.java:264) \tat java.base\/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) \tat java.base\/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) \tat java.base\/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.connect.errors.DataException: some bad stuff happened \tat com.strava.data.timber.kafka.connect.smt.Rogue.apply(Rogue.java:14) \tat org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146) \tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180) \t... 11 more", "exception_class": "org.apache.kafka.connect.errors.ConnectException", "exception_message": "Tolerance exceeded in error handler" }, "source_host": "41f5aba64925", "method": "doRun", "level": "ERROR", "message": "WorkerSourceTask{id=source-icecap-staging-coredb-0} Task threw an uncaught and unrecoverable exception", "mdc": { "connector.context": "[source-icecap-staging-coredb|task-0] " }, "@timestamp": "2020-10-01T17:27:27.964Z", "file": "WorkerTask.java", "line_number": "187", "thread_name": "task-thread-source-icecap-staging-coredb-0", "@version": 1, "logger_name": "org.apache.kafka.connect.runtime.WorkerTask", "class": "org.apache.kafka.connect.runtime.WorkerTask" }
This is expected but when I actually go to the connector's status endpoint I'm getting:
{ "name": "source-icecap-staging-coredb", "connector": { "state": "RUNNING", "worker_id": "localhost:8083" }, "tasks": [{ "id": 0, "state": "RUNNING", "worker_id": "localhost:8083" }], "type": "source" }
I'm expecting the connector and/or task to be in a FAILED state.
I'm using Debezium MySQL connector version 1.3.0.CR1
A thread-dump revealed a potential dead lock preventing the thread from progressing when Kafka Connec calls the stop() method on the task:
"task-thread-source-icecap-staging-coredb-0" #62 prio=5 os_prio=0 cpu=2628.13ms elapsed=29.57s tid=0x00007f2c10140000 nid=0x2ff runnable [0x00007f2bb9beb000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(java.base@11.0.8/Native Method) at java.net.SocketInputStream.socketRead(java.base@11.0.8/SocketInputStream.java:115) at java.net.SocketInputStream.read(java.base@11.0.8/SocketInputStream.java:168) at java.net.SocketInputStream.read(java.base@11.0.8/SocketInputStream.java:140) at com.mysql.cj.protocol.ReadAheadInputStream.fill(ReadAheadInputStream.java:107) at com.mysql.cj.protocol.ReadAheadInputStream.readFromUnderlyingStreamIfNecessary(ReadAheadInputStream.java:150) at com.mysql.cj.protocol.ReadAheadInputStream.read(ReadAheadInputStream.java:180) - locked <0x0000000081c7c148> (a com.mysql.cj.protocol.ReadAheadInputStream) at java.io.FilterInputStream.read(java.base@11.0.8/FilterInputStream.java:133) at com.mysql.cj.protocol.FullReadInputStream.readFully(FullReadInputStream.java:64) at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:108) at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45) at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:57) at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41) at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:61) at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44) at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:75) at com.mysql.cj.protocol.a.ResultsetRowReader.read(ResultsetRowReader.java:42) at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1688) at com.mysql.cj.protocol.a.result.ResultsetRowsStreaming.next(ResultsetRowsStreaming.java:202) at com.mysql.cj.protocol.a.result.ResultsetRowsStreaming.close(ResultsetRowsStreaming.java:119) - locked <0x0000000081c72908> (a com.mysql.cj.jdbc.ConnectionImpl) at com.mysql.cj.jdbc.result.ResultSetImpl.realClose(ResultSetImpl.java:1882) - locked <0x0000000081c72908> (a com.mysql.cj.jdbc.ConnectionImpl) at com.mysql.cj.jdbc.result.ResultSetImpl.close(ResultSetImpl.java:529) at com.mysql.cj.jdbc.StatementImpl.realClose(StatementImpl.java:1805) at com.mysql.cj.jdbc.ConnectionImpl.closeAllOpenStatements(ConnectionImpl.java:744) at com.mysql.cj.jdbc.ConnectionImpl.realClose(ConnectionImpl.java:1745) at com.mysql.cj.jdbc.ConnectionImpl.close(ConnectionImpl.java:720) - locked <0x0000000081c72908> (a com.mysql.cj.jdbc.ConnectionImpl) at io.debezium.jdbc.JdbcConnection.close(JdbcConnection.java:913) - locked <0x0000000081c61290> (a io.debezium.jdbc.JdbcConnection) at io.debezium.connector.mysql.MySqlJdbcContext.shutdown(MySqlJdbcContext.java:151) at io.debezium.connector.mysql.MySqlTaskContext.shutdown(MySqlTaskContext.java:259) at io.debezium.connector.mysql.MySqlConnectorTask.completeReaders(MySqlConnectorTask.java:494) at io.debezium.connector.mysql.MySqlConnectorTask$$Lambda$815/0x000000084074bc40.run(Unknown Source) at io.debezium.connector.mysql.ChainedReader.readerCompletedPolling(ChainedReader.java:165) - locked <0x0000000082161c68> (a io.debezium.connector.mysql.ChainedReader) at io.debezium.connector.mysql.ChainedReader$$Lambda$814/0x000000084074b840.run(Unknown Source) at io.debezium.connector.mysql.AbstractReader.cleanupResources(AbstractReader.java:309) at io.debezium.connector.mysql.SnapshotReader.doStop(SnapshotReader.java:140) at io.debezium.connector.mysql.AbstractReader.stop(AbstractReader.java:130) at io.debezium.connector.mysql.ChainedReader.stop(ChainedReader.java:122) - locked <0x0000000082161c68> (a io.debezium.connector.mysql.ChainedReader) at io.debezium.connector.mysql.MySqlConnectorTask.doStop(MySqlConnectorTask.java:467) at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:206) at io.debezium.connector.common.BaseSourceTask.stop(BaseSourceTask.java:176) at org.apache.kafka.connect.runtime.WorkerSourceTask.tryStop(WorkerSourceTask.java:220) - locked <0x0000000081cbbd98> (a org.apache.kafka.connect.runtime.WorkerSourceTask) at org.apache.kafka.connect.runtime.WorkerSourceTask.close(WorkerSourceTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:164) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) at java.util.concurrent.Executors$RunnableAdapter.call(java.base@11.0.8/Executors.java:515) at java.util.concurrent.FutureTask.run(java.base@11.0.8/FutureTask.java:264) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.8/ThreadPoolExecutor.java:1128) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.8/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.8/Thread.java:834) "SourceTaskOffsetCommitter-1" #63 prio=5 os_prio=0 cpu=0.09ms elapsed=29.57s tid=0x00007f2c1014c000 nid=0x300 waiting on condition [0x00007f2bb9aeb000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.8/Native Method) - parking to wait for <0x0000000081a48c80> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(java.base@11.0.8/LockSupport.java:234) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(java.base@11.0.8/AbstractQueuedSynchronizer.java:2123) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(java.base@11.0.8/ScheduledThreadPoolExecutor.java:1182) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(java.base@11.0.8/ScheduledThreadPoolExecutor.java:899) at java.util.concurrent.ThreadPoolExecutor.getTask(java.base@11.0.8/ThreadPoolExecutor.java:1054) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.8/ThreadPoolExecutor.java:1114) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.8/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.8/Thread.java:834) "debezium-mysqlconnector-staging_coredb-snapshot" #66 prio=5 os_prio=0 cpu=1860.91ms elapsed=28.58s tid=0x00007f2c1447c800 nid=0x303 waiting for monitor entry [0x00007f2bb98e9000] java.lang.Thread.State: BLOCKED (on object monitor) at com.mysql.cj.jdbc.result.ResultSetImpl.checkColumnBounds(ResultSetImpl.java:467) - waiting to lock <0x0000000081c72908> (a com.mysql.cj.jdbc.ConnectionImpl) at com.mysql.cj.jdbc.result.ResultSetImpl.getTimestamp(ResultSetImpl.java:954) at io.debezium.connector.mysql.SnapshotReader.readTimestampField(SnapshotReader.java:227) at io.debezium.connector.mysql.SnapshotReader.readField(SnapshotReader.java:161) at io.debezium.connector.mysql.SnapshotReader.lambda$execute$14(SnapshotReader.java:653) at io.debezium.connector.mysql.SnapshotReader$$Lambda$934/0x00000008408bb840.accept(Unknown Source) at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:537) at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:643) at io.debezium.connector.mysql.SnapshotReader$$Lambda$818/0x000000084074ac40.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.8/ThreadPoolExecutor.java:1128) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.8/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.8/Thread.java:834)