-
Bug
-
Resolution: Done
-
Major
-
0.6.2
-
None
I have java.lang.NullPointerException for paused connector after the restart of Kafka Connect application:
[2017-12-19T08:41:38,237][ERROR][category=org.apache.kafka.connect.runtime.WorkerTask] Task is being killed and will not recover until manually restarted [2017-12-19T08:41:38,699][ERROR][category=org.apache.kafka.connect.runtime.WorkerSourceTask] Exception thrown while calling task.commit() java.lang.NullPointerException at io.debezium.connector.postgresql.PostgresConnectorTask.commit(PostgresConnectorTask.java:154) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:383) at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:330) at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:108) at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:45) at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:82) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Root cause:
WorkerTask instantiates the task, but it doesn’t start the task if the task has PAUSED state:
org.apache.kafka.connect.runtime.WorkerTask.java
private void doRun() throws InterruptedException { try { synchronized (this) { if (stopping) return; if (targetState == TargetState.PAUSED) { onPause(); if (!awaitUnpause()) return; } statusListener.onStartup(id); } execute(); // here is performed the task starting } catch (Throwable t) { log.error("Task {} threw an uncaught and unrecoverable exception", id, t); log.error("Task is being killed and will not recover until manually restarted"); throw t; } finally { doClose(); } }
So PostgresConnectorTask doesn’t initialize producer variable and commit() method invocation can throw java.lang.NullPointerException:
io.debezium.connector.postgresql.PostgresConnectorTask.java
@Override public void commit() throws InterruptedException { producer.commit(); }