Details
-
Bug
-
Resolution: Unresolved
-
Major
-
None
-
None
-
None
-
False
-
False
-
1.3.1.Final
-
Undefined
Description
We are integrating Embedded Debezium in our application. One important feature of Debezium is that if Debezium crushed or is stopped, when it is started again, it will read the offset from offsets.dat file and continue processing the new records instead of starting from the beginning. We would like to have this feature work in our application, however it does not.
We have below configuration in our property file:
name=engine
connector.class=io.debezium.connector.sqlserver.SqlServerConnectoroffset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=offsets.dat
This is the standard configuration suggested by the README of debezium-embedded.
We also implemented DebeziumEngine in the standard way in our code just for testing purpose:
@PostConstruct
public void init() {
// Create the engine with this configuration ...
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class).using(
debeziumProperties).notifying(record -> {
System.out.println(record.toString());
}).build();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
}
When we ran our application, we could see that Debezium connected to our SQL server and processed records from the desired database table. We also confirmed that offsets.dat was created when the application was run for the first and it was flushed constantly every 60 seconds which is the default value of offset.flush.interval.ms.
During our testing, every time our application is stopped and restarted, it will start from processing the very first record in the database table. We did see from the logs that org.apache.kafka.connect.storage.FileOffsetBackingStore was called to load offsets.bat, but the offset did not seem to work.
We configured and implemented Embedded Debezium in the way suggested by Debezium, so we believe there is a bug somewhere that caused either offsets.dat is not flushed properly when Debezium is running or offsets.dat is not loaded correctly when Debezium is restarted.
Thanks.