Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-7813

Unable To Exclude Column Using Configuration

XMLWordPrintable

    • False
    • None
    • False

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      I am using Informix Debezium Connector and the version of the jars is 2.6.1 Final. 

      What is the connector configuration?

      {
          "connector.class": "io.debezium.connector.informix.InformixConnector",
          "errors.log.include.messages": "true",
          "topic.creation.default.partitions": "1",
          "value.converter.schema.registry.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy",
          "key.converter.schema.registry.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy",
          "transforms": "unwrap,TSF1",
          "signal.enabled.channels": "source",
          "errors.deadletterqueue.context.headers.enable": "true",
          "queue.buffering.max.ms": "90000",
          "transforms.unwrap.drop.tombstones": "false",
          "signal.data.collection": "cards_1952.mcp.dbz_signal",
          "topic.creation.default.replication.factor": "1",
          "errors.deadletterqueue.topic.replication.factor": "1",
          "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
          "errors.log.enable": "true",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "database.user": "kafka",
          "database.dbname": "cards_1952",
          "topic.creation.default.compression.type": "lz4",
          "schema.history.internal.kafka.bootstrap.servers": ".....",
          "value.converter.schema.registry.url": "....",
          "transforms.TSF1.static.value": "150",
          "errors.max.retries": "-1",
          "errors.deadletterqueue.topic.name": "informix-gpdb-source-errors",
          "database.password": "******",
          "name": "new_source",
          "errors.tolerance": "all",
          "send.buffer.bytes": "1000000",
          "pk.mode": "primary_key",
          "snapshot.mode": "schema_only",
          "compression.type": "lz4",
          "incremental.snapshot.chunk.size": "50000",
          "tasks.max": "1",
          "retriable.restart.connector.wait.ms": "60000",
          "transforms.TSF1.static.field": "instance_id",
          "schema.history.internal.store.only.captured.tables.ddl": "true",
          "schema.history.internal.store.only.captured.databases.ddl": "true",
          "transforms.TSF1.type": "org.apache.kafka.connect.transforms.InsertField$Value",
          "tombstones.on.delete": "true",
          "topic.prefix": "....",
          "decimal.handling.mode": "double",
          "schema.history.internal.kafka.topic": "informixschemahistory",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "topic.creation.default.cleanup.policy": "compact",
          "time.precision.mode": "connect",
          "database.server.name": "....",
          "snapshot.isolation.mode": "read_committed",
          "database.port": "...",
          "debug.logs": "true",
          "column.exclude.list": "mcp.kafkatable.value1",
          "database.hostname": "....",
          "table.include.list": "cards_1952.mcp.kafkatable,cards_1952.mcp.dbz_signal",
          "key.converter.schema.registry.url": "...."
      } 

      What is the captured database version and mode of depoyment?

      I am using the below versions of Informix and I am using the KAFKA Connect UI for Debezium connector deployment. 

      Informix Dynamic Server
      14.10.FC10W1X2
      Informix JDBC Driver for Informix Dynamic Server
      4.50.JC10

      KAFKA Version: 7.4.1-ce

      What behaviour do you expect?

      After setting up the required configuration to exclude a column (i.e., "column.exclude.list": "mcp.kafkatable.value1") in the Informix source connector, the expectation was that CDC would be enabled for the table "kafkatable". Once an insert is executed on this table, CDC would capture the activity, save the table schema in the schema registry, and publish a message in the topic by the connector.

      What behaviour do you see?

      After setting up the required configuration to exclude a column (i.e., "column.exclude.list": "mcp.kafkatable.value1") in the Informix source connector, an insert was executed on table kafkatable. In the KAFKA Connect logs, it was being logged that the internal schema is out of sync with the real database schema, however, kafkatable was a new table on which CDC has not been enabled before, and thus there was no schema of this table in the schema registry.  

      Do you have the connector logs, ideally from start till finish?

      [2024-04-24 12:23:55,035] ERROR [new_source|task-0] Error requesting a row value, row: 5, requested index: 5 at position 4 (io.debezium.relational.TableSchemaBuilder:259)
      [2024-04-24 12:24:04,774] INFO [gp sink|task-0] Received 0 records (io.confluent.connect.jdbc.sink.JdbcSinkTask:93)
      [2024-04-24 12:24:09,159] ERROR [new_source|task-0] Caught Exception (io.debezium.connector.informix.InformixStreamingChangeEventSource:201)
      org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {begin_lsn=472184409645080, commit_lsn=472184409645460, change_lsn=472184409645292}
              at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:322)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.handleOperation(InformixStreamingChangeEventSource.java:437)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.handleTransaction(InformixStreamingChangeEventSource.java:306)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.execute(InformixStreamingChangeEventSource.java:179)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.execute(InformixStreamingChangeEventSource.java:38)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:280)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:140)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
              at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:260)
              at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:289)
              at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
              at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:71)
              at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:47)
              at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:271)
              ... 12 more
      [2024-04-24 12:24:09,160] ERROR [new_source|task-0] Producer failure (io.debezium.pipeline.ErrorHandler:52)
      org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {begin_lsn=472184409645080, commit_lsn=472184409645460, change_lsn=472184409645292}
              at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:322)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.handleOperation(InformixStreamingChangeEventSource.java:437)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.handleTransaction(InformixStreamingChangeEventSource.java:306)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.execute(InformixStreamingChangeEventSource.java:179)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.execute(InformixStreamingChangeEventSource.java:38)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:280)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:140)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
              at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:260)
              at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:289)
              at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
              at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:71)
              at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:47)
              at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:271)
              ... 12 more
      [2024-04-24 12:24:09,161] INFO [new_source|task-0] Finished streaming (io.debezium.pipeline.ChangeEventSourceCoordinator:281)
      [2024-04-24 12:24:09,161] INFO [new_source|task-0] Connected metrics set to 'false' (io.debezium.pipeline.ChangeEventSourceCoordinator:433)
      [2024-04-24 12:24:09,310] ERROR [new_source|task-0] WorkerSourceTask{id=new_source-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:221)
      org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
              at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:67)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.execute(InformixStreamingChangeEventSource.java:202)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.execute(InformixStreamingChangeEventSource.java:38)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:280)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197)
              at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:140)
              at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
              at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
              at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
              at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
              at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {begin_lsn=472184409645080, commit_lsn=472184409645460, change_lsn=472184409645292}
              at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:322)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.handleOperation(InformixStreamingChangeEventSource.java:437)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.handleTransaction(InformixStreamingChangeEventSource.java:306)
              at io.debezium.connector.informix.InformixStreamingChangeEventSource.execute(InformixStreamingChangeEventSource.java:179)
              ... 9 more
      Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
              at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:260)
              at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:289)
              at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
              at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:71)
              at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:47)
              at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:271)
              ... 12 more
      [2024-04-24 12:24:09,311] INFO [new_source|task-0] Stopping down connector (io.debezium.connector.common.BaseSourceTask:398)
      [2024-04-24 12:24:09,312] INFO [new_source|task-0] SignalProcessor stopped (io.debezium.pipeline.signal.SignalProcessor:127)
      [2024-04-24 12:24:09,312] INFO [new_source|task-0] Debezium ServiceRegistry stopped. (io.debezium.service.DefaultServiceRegistry:105)
      [2024-04-24 12:24:09,315] INFO [new_source|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:952)
      [2024-04-24 12:24:09,315] INFO [new_source|task-0] Connection gracefully closed (io.debezium.jdbc.JdbcConnection:952)
      [2024-04-24 12:24:09,316] INFO [new_source|task-0] [Producer clientId=mcp_kafka_net_21-schemahistory] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1310)
      [2024-04-24 12:24:09,320] INFO [new_source|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:710)
      [2024-04-24 12:24:09,321] INFO [new_source|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:714)
      [2024-04-24 12:24:09,321] INFO [new_source|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:720)
      [2024-04-24 12:24:09,321] INFO [new_source|task-0] App info kafka.producer for mcp_kafka_net_21-schemahistory unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
      [2024-04-24 12:24:09,321] INFO [new_source|task-0] [Producer clientId=connector-producer-new_source-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1310)
      [2024-04-24 12:24:09,324] INFO [new_source|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:710)
      [2024-04-24 12:24:09,324] INFO [new_source|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:714)
      [2024-04-24 12:24:09,324] INFO [new_source|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:720)
       

        1. kafka connect complete logs.out
          441 kB
        2. table schema.txt
          10 kB
        3. deatiled logs.out
          3.61 MB
        4. new kafka logs.out
          4.03 MB

              Unassigned Unassigned
              maaheen11 Maaheen Yasin (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: