Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5295

Lob type data is inconsistent between source and sink, after modifying the primary key

XMLWordPrintable

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      Lob-type data is inconsistent between source and sink, after modifying the primary key

      What Debezium connector do you use and what version?

      oracle-connector

       

      What is the captured database version and mode of depoyment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      oracle_19c

      What behaviour do you see?

      After modifying the primary key in source DB, i got the  "__debezium_unavailable_value" from lob-type column in sink db

      Do you see the same behaviour using the latest relesead Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      V2.0,V1.9

       

      How to reproduce the issue using our tutorial deployment?

      just modifying  the primary key in source DB

       

      Implementation ideas (optional)

      the logic is flawed in  the function named "emitUpdateRecord"  declared in  RelationalChangeRecordEmitter.java

      ###########PK update -> emit as delete and re-insert with new key

      @Override
      protected void emitUpdateRecord(Receiver<P> receiver, TableSchema tableSchema)
              throws InterruptedException {
          Object[] oldColumnValues = getOldColumnValues();
          Object[] newColumnValues = getNewColumnValues();
      
          Struct oldKey = tableSchema.keyFromColumnData(oldColumnValues);
          Struct newKey = tableSchema.keyFromColumnData(newColumnValues);
      
          Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
          Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
      
          if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
              LOGGER.warn("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
              return;
          }
          // some configurations does not provide old values in case of updates
          // in this case we handle all updates as regular ones
          if (oldKey == null || Objects.equals(oldKey, newKey)) {
              Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
              receiver.changeRecord(getPartition(), tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), null);
          }
          // PK update -> emit as delete and re-insert with new key
          else {
              ConnectHeaders headers = new ConnectHeaders();
              headers.add(PK_UPDATE_NEWKEY_FIELD, newKey, tableSchema.keySchema());
      
              Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
              receiver.changeRecord(getPartition(), tableSchema, Operation.DELETE, oldKey, envelope, getOffset(), headers);
      
              headers = new ConnectHeaders();
              headers.add(PK_UPDATE_OLDKEY_FIELD, oldKey, tableSchema.keySchema());
      
              envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
              receiver.changeRecord(getPartition(), tableSchema, Operation.CREATE, newKey, envelope, getOffset(), headers);
          }
      } 

      This will change the update-event to two new events which include delete & create event.

      The create event caused the error.

            ccranfor@redhat.com Chris Cranford
            udontknow C J (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

              Created:
              Updated:
              Resolved: