diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index d798fdbb2..bd0bfea20 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -389,21 +389,15 @@ else if (e.getMessage().matches("(?s)ERROR: requested WAL segment .* has already @Override public void read(ReplicationMessageProcessor processor) throws SQLException, InterruptedException { ByteBuffer read = stream.read(); - final long lastReceiveLsn = stream.getLastReceiveLSN().asLong(); - LOGGER.trace("Streaming requested from LSN {}, received LSN {}", startingLsn, lastReceiveLsn); - if (messageDecoder.shouldMessageBeSkipped(read, lastReceiveLsn, startingLsn, skipFirstFlushRecord)) { - return; - } + deserializeMessages(read, processor); } @Override public boolean readPending(ReplicationMessageProcessor processor) throws SQLException, InterruptedException { ByteBuffer read = stream.readPending(); - final long lastReceiveLsn = stream.getLastReceiveLSN().asLong(); - LOGGER.trace("Streaming requested from LSN {}, received LSN {}", startingLsn, lastReceiveLsn); - if (read == null || messageDecoder.shouldMessageBeSkipped(read, lastReceiveLsn, startingLsn, skipFirstFlushRecord)) { + if (read == null) { return false; }