-
Bug
-
Resolution: Done
-
Major
-
0.6.2
-
None
I have java.lang.NullPointerException for paused connector after the restart of Kafka Connect application:
[2017-12-19T08:41:38,237][ERROR][category=org.apache.kafka.connect.runtime.WorkerTask] Task is being killed and will not recover until manually restarted
[2017-12-19T08:41:38,699][ERROR][category=org.apache.kafka.connect.runtime.WorkerSourceTask] Exception thrown while calling task.commit()
java.lang.NullPointerException
at io.debezium.connector.postgresql.PostgresConnectorTask.commit(PostgresConnectorTask.java:154)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitSourceTask(WorkerSourceTask.java:383)
at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:330)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:108)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:45)
at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:82)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Root cause:
WorkerTask instantiates the task, but it doesn’t start the task if the task has PAUSED state:
org.apache.kafka.connect.runtime.WorkerTask.java
private void doRun() throws InterruptedException { try { synchronized (this) { if (stopping) return; if (targetState == TargetState.PAUSED) { onPause(); if (!awaitUnpause()) return; } statusListener.onStartup(id); } execute(); // here is performed the task starting } catch (Throwable t) { log.error("Task {} threw an uncaught and unrecoverable exception", id, t); log.error("Task is being killed and will not recover until manually restarted"); throw t; } finally { doClose(); } }
So PostgresConnectorTask doesn’t initialize producer variable and commit() method invocation can throw java.lang.NullPointerException:
io.debezium.connector.postgresql.PostgresConnectorTask.java
@Override public void commit() throws InterruptedException { producer.commit(); }