-
Bug
-
Resolution: Unresolved
-
Major
-
None
-
None
Bug report
Context:
When an exception (like TimeoutException) occurs in the method PubSubChangeConsumer.handleBatch, the app will throw this exception. Then we restart app, it will start from last successful checkpoint.
However, in this case, a data loss issue may occur.
Explain:
With the finally block in here, this always flushes offsets of all processed messages in case app encounters exception.
Resolve:
Moving the logic committer marks process for each record and marks batch finished after publishing all messages successfully to PubSub.
What Debezium connector do you use and what version?
2.4 to 2.6
What is the connector configuration?
debezium.sink.type=pubsub
What is the captured database version and mode of deployment?
mysql, postgres <not relevant >
What behaviour do you expect?
Don't loss data when app restarts after encountering exception like TimeoutException.
What behaviour do you see?
Loss data.
Do you see the same behaviour using the latest relesead Debezium version?
Yes
Do you have the connector logs, ideally from start till finish?
Actually no need to have the log, this is log when app encountered exception, after that I restarted app and found missing some messages in failed batch before.
2024-05-03 05:03:00,298 INFO [io.deb.pip.sig.SignalProcessor] (pool-6-thread-1) SignalProcessor stopped
2024-05-03 05:03:00,299 INFO [io.deb.jdb.JdbcConnection] (pool-21-thread-1) Connection gracefully closed
2024-05-03 05:03:00,300 INFO [io.deb.jdb.JdbcConnection] (pool-22-thread-1) Connection gracefully closed
2024-05-03 05:03:00,302 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-6-thread-1) Stopped FileOffsetBackingStore
2024-05-03 05:03:00,305 ERROR [io.deb.ser.ConnectorLifecycle] (pool-6-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: java.util.concurrent.TimeoutException: Waited 300 milliseconds (plus 202994 nanoseconds delay) for ListFuture@249bfbd[status=PENDING, info=[futures=[com.google.api.core.AbstractApiFuture$InternalSettableFuture@41bf6df2[status=SUCCESS, result=[java.lang.String@59f68e71]], com.google.api.core.AbstractApiFuture$InternalSettableFuture@74ae9cd3[status=SUCCESS, result=[java.lang.String@3b5561ac]]
.....
com.google.api.core.AbstractApiFuture$InternalSettableFuture@7a693573[status=SUCCESS, result=[java.lang.String@6295b4d6]]]]]
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:531)
at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.get(AbstractFuture.java:119)
at com.google.common.util.concurrent.ForwardingFuture.get(ForwardingFuture.java:75)
at io.debezium.server.pubsub.PubSubChangeConsumer.handleBatch(PubSubChangeConsumer.java:244)
... 8 more
2024-05-03 05:03:00,334 INFO [io.deb.ser.DebeziumServer] (main) Received request to stop the engine
2024-05-03 05:03:00,335 INFO [io.deb.emb.EmbeddedEngine] (main) Stopping the embedded engine
How to reproduce the issue using our tutorial deployment?
- Setup "example-postgres" database + debezium postgres connector + debezim-server-pubsub
- While app is running, disconnect internet connection(will cause TimeoutException) and insert some new records to database, then the issue happens: messages can't publish to PubSub while offset will flush to persist storage.