Lob type data is inconsistent between source and sink, after modifying the primary key


      Lob-type data is inconsistent between source and sink, after modifying the primary key

      What behaviour do you see?

      After modifying the primary key in source DB, i got the  "__debezium_unavailable_value" from lob-type column in sink db

      How to reproduce the issue using our tutorial deployment?

      just modifying  the primary key in source DB


      Implementation ideas (optional)

      the logic is flawed in  the function named "emitUpdateRecord"  declared in  RelationalChangeRecordEmitter.java

      ###########PK update -> emit as delete and re-insert with new key

      protected void emitUpdateRecord(Receiver<P> receiver, TableSchema tableSchema)
              throws InterruptedException {
          Object[] oldColumnValues = getOldColumnValues();
          Object[] newColumnValues = getNewColumnValues();
          Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues);
          Struct newKey = tableSchema.keyFromColumnData(newColumnValues);
          Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
          Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
          if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
              LOGGER.warn("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
          // some configurations does not provide old values in case of updates
          // in this case we handle all updates as regular ones
          if (oldKey == null || Objects.equals(oldKey, newKey)) {
              Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
              receiver.changeRecord(getPartition(), tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), null);
          // PK update -> emit as delete and re-insert with new key
          else {
              ConnectHeaders headers = new ConnectHeaders();
              headers.add(PK_UPDATE_NEWKEY_FIELD, newKey, tableSchema.keySchema());
              Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
              receiver.changeRecord(getPartition(), tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers);
              headers = new ConnectHeaders();
              headers.add(PK_UPDATE_OLDKEY_FIELD, oldKey, tableSchema.keySchema());
              envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
              receiver.changeRecord(getPartition(), tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers);

      This will change the update-event to two new events which include delete & create event.

      The create event caused the error.

