-
Bug
-
Resolution: Done
-
Critical
-
None
-
None
when I run mysql-binlog-connector with debezium on confluent, I found that sometimes when confluent start a rebalance, BinnaryLogClient can`t disconnect successfully.
The jstack command shows as bellow:
"pool-3-thread-4" #51 prio=5 os_prio=0 tid=0x00007f6898066800 nid=0x40de waiting on condition [0x00007f68761d6000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000814981e0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireNanos(AbstractQueuedSynchronizer.java:934)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireNanos(AbstractQueuedSynchronizer.java:1247)
at java.util.concurrent.locks.ReentrantLock.tryLock(ReentrantLock.java:442)
at com.github.shyiko.mysql.binlog.BinaryLogClient.tryLockInterruptibly(BinaryLogClient.java:1165)
at com.github.shyiko.mysql.binlog.BinaryLogClient.terminateConnect(BinaryLogClient.java:1158)
at com.github.shyiko.mysql.binlog.BinaryLogClient.disconnect(BinaryLogClient.java:1132)
at io.debezium.connector.mysql.BinlogReader.doStop(BinlogReader.java:270)
at io.debezium.connector.mysql.AbstractReader.stop(AbstractReader.java:85)
at io.debezium.connector.mysql.ChainedReader.stop(ChainedReader.java:101) - locked <0x0000000081499590> (a io.debezium.connector.mysql.ChainedReader)
at io.debezium.connector.mysql.MySqlConnectorTask.stop(MySqlConnectorTask.java:265) - locked <0x0000000081499698> (a io.debezium.connector.mysql.MySqlConnectorTask)
at org.apache.kafka.connect.runtime.WorkerSourceTask.stop(WorkerSourceTask.java:147) - locked <0x00000000811b30e8> (a org.apache.kafka.connect.runtime.WorkerSourceTask)
at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:471)
at org.apache.kafka.connect.runtime.Worker.stopAndAwaitTask(Worker.java:530)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:881)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.call(DistributedHerder.java:878)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
And I found the terminateConnect() method can`t aquire the connectLock, so it will try to get the connectLock time after time. Is that an issues? anyone can help me, thanks a lot