Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5295

Lob type data is inconsistent between source and sink, after modifying the primary key


      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      Lob-type data is inconsistent between source and sink, after modifying the primary key

      What Debezium connector do you use and what version?



      What is the captured database version and mode of depoyment?

      (E.g. on-premises, with a specific cloud provider, etc.)


      What behaviour do you see?

      After modifying the primary key in source DB, i got the  "__debezium_unavailable_value" from lob-type column in sink db

      Do you see the same behaviour using the latest relesead Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)



      How to reproduce the issue using our tutorial deployment?

      just modifying  the primary key in source DB


      Implementation ideas (optional)

      the logic is flawed in  the function named "emitUpdateRecord"  declared in  RelationalChangeRecordEmitter.java

      ###########PK update -> emit as delete and re-insert with new key

      protected void emitUpdateRecord(Receiver<P> receiver, TableSchema tableSchema)
              throws InterruptedException {
          Object[] oldColumnValues = getOldColumnValues();
          Object[] newColumnValues = getNewColumnValues();
          Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues);
          Struct newKey = tableSchema.keyFromColumnData(newColumnValues);
          Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
          Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
          if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
              LOGGER.warn("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
          // some configurations does not provide old values in case of updates
          // in this case we handle all updates as regular ones
          if (oldKey == null || Objects.equals(oldKey, newKey)) {
              Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
              receiver.changeRecord(getPartition(), tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), null);
          // PK update -> emit as delete and re-insert with new key
          else {
              ConnectHeaders headers = new ConnectHeaders();
              headers.add(PK_UPDATE_NEWKEY_FIELD, newKey, tableSchema.keySchema());
              Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
              receiver.changeRecord(getPartition(), tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers);
              headers = new ConnectHeaders();
              headers.add(PK_UPDATE_OLDKEY_FIELD, oldKey, tableSchema.keySchema());
              envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
              receiver.changeRecord(getPartition(), tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers);

      This will change the update-event to two new events which include delete & create event.

      The create event caused the error.

            ccranfor@redhat.com Chris Cranford
            udontknow C J (Inactive)
            0 Vote for this issue
            6 Start watching this issue