-
Bug
-
Resolution: Done
-
Major
-
1.1.2.Final, 1.2.0.Final
-
None
-
- Run postgresql v1.1.2 connector
- Have some transactions which produce a lot of WAL logs
- Have a lot of small transactions at the same time
- Kill the connector while its processing such a large transaction
Hello everyone,
thanks for all your work in debezium. It is truly an amazing tool. Thank you!
We are running debezium postgresql connector 1.1.2 against an RDS database. The other day we restarted kafka-connect and something strange happened. After the connector was restarted (after a unclean shutdown), it started to log these messages:
Streaming requested from LSN 88198654860672 but received LSN 88198094438920 that is same or smaller so skipping the message
I am unsure why this happened, but the effects are bad:
- The replication slot is not advanced, therefore the postgres server keeps its WAL files (so a restart while catching up erases the progress)
- The connector is not producing new messages, therefore no changes arrive at downstream systems
- Catching up is extremely slow, therefore sometimes the gap increased
While doing this the connector was not producing production relevant data, therefore we were replacing it and where creating a new snapshot
After reading through the source code I think I was able to pinpoint the problem to this method:
@Override public boolean readPending(ReplicationMessageProcessor processor) throws SQLException, InterruptedException { ByteBuffer read = stream.readPending(); final long lastReceiveLsn = stream.getLastReceiveLSN().asLong(); LOGGER.trace("Streaming requested from LSN {}, received LSN {}", startingLsn, lastReceiveLsn); if (read == null || messageDecoder.shouldMessageBeSkipped(read, lastReceiveLsn, startingLsn, skipFirstFlushRecord)) { return false; } deserializeMessages(read, processor); return true; }
If I reading this correctly, whenever messages are skipped this is considering as a "no pending messages" and therefore is not progressing the replication slot and actually waiting the poll interval for new messages.
I think the method should look like this:
@Override public boolean readPending(ReplicationMessageProcessor processor) throws SQLException, InterruptedException { ByteBuffer read = stream.readPending(); final long lastReceiveLsn = stream.getLastReceiveLSN().asLong(); LOGGER.trace("Streaming requested from LSN {}, received LSN {}", startingLsn, lastReceiveLsn); if (read == null) { return false; } if (messageDecoder.shouldMessageBeSkipped(read, lastReceiveLsn, startingLsn, skipFirstFlushRecord)) { return true; } deserializeMessages(read, processor); return true; }
(Patch is attached)
Thank you for considering and reading this.