Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8358

Filter for snapshot using signal doesn't seem to work

XMLWordPrintable

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      debezium-server version 3.0.0

      What is the connector configuration?

      1. sink config
        debezium.sink.type=kafka
        debezium.sink.kafka.producer.bootstrap.servers=xxxxx.eu-west-1.aws.confluent.cloud:9092
        debezium.sink.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
        debezium.sink.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
        debezium.sink.kafka.producer.security.protocol=SASL_SSL
        debezium.sink.kafka.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='xxxxx' password='xxxxx';
        debezium.sink.kafka.producer.sasl.mechanism=PLAIN
        debezium.sink.kafka.producer.client.dns.lookup=use_all_dns_ips
        debezium.sink.kafka.producer.client=PLAIN
        debezium.sink.kafka.producer.client.id=vmCDC-SD
        debezium.sink.kafka.producer.compression.type=lz4
        debezium.sink.kafka.producer.creation.enable=false
      1. source config

      debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
      debezium.source.database.hostname=vmCDC
      debezium.source.database.port=1433
      debezium.source.database.user=su_cdc
      debezium.source.database.password=xxxxx
      debezium.source.database.names=TESTDB_2024
      debezium.source.database.encrypt=false
      debezium.source.topic.prefix=mob-suppliers-2
      debezium.source.table.include.list=dbo.tcode, dbo.tpost, dbo.tbedrijfrekeningnummer

      1. , dbo.tbedrijf, dbo.tbedrijfrekeningnummer, dbo.tbedrijfactiviteit
        debezium.source.offset.storage.file.filename=data/offsets.dat
        debezium.source.schema.history.internal.kafka.bootstrap.servers=xxxxx.eu-west-1.aws.confluent.cloud:9092
        debezium.source.schema.history.internal.kafka.sasl.mechanism=PLAIN
        debezium.source.schema.history.internal.kafka.topic=mob-suppliers-2-internal
        debezium.source.schema.history.internal.kafka.security.protocol=SASL_SSL
        debezium.source.schema.history.internal.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='xxxxx' password='xxxxx';
        debezium.source.schema.history.internal.consumer.security.protocol=SASL_SSL
        debezium.source.schema.history.internal.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='xxxxx' password='xxxxx';
        debezium.source.schema.history.internal.consumer.sasl.mechanism=PLAIN
        debezium.source.schema.history.internal.producer.security.protocol=SASL_SSL
        debezium.source.schema.history.internal.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='xxxxx' password='xxxxx';
        debezium.source.schema.history.internal.producer.sasl.mechanism=PLAIN
        debezium.source.schema.history.internal.store.only.captured.tables.ddl=true
        debezium.source.include.schema.changes=false
        debezium.source.topic.creation.default.compression.type=lz4
        debezium.source.topic.creation.enable=false
        debezium.source.provide.transaction.metadata=true
        debezium.source.snapshot.mode=schema_only
      1. signals
        debezium.source.signal.enabled.channels=kafka,source
        debezium.source.signal.data.collection = TESTDB_2024.dbo.debezium_signal
        debezium.source.signal.kafka.topic=test-signal
        debezium.source.signal.kafka.bootstrap.servers=xxxxx.eu-west-1.aws.confluent.cloud:9092
        debezium.source.signal.kafka.sasl.mechanism=PLAIN
        debezium.source.signal.kafka.security.protocol=SASL_SSL
        debezium.source.signal.kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='xxxxx' password='xxxxx';
        debezium.source.signal.consumer.security.protocol=SASL_SSL
        debezium.source.signal.consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='xxxxx' password='xxxxx';
        debezium.source.signal.consumer.sasl.mechanism=PLAIN
        debezium.source.signal.consumer.client.id=test-client-id
        debezium.source.signal.consumer.group.id=test-group-id
        debezium.source.signal.consumer.auto.commit.offset=true
        debezium.source.signal.producer.security.protocol=SASL_SSL
        debezium.source.signal.producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='xxxxx' password='xxxxx';
        debezium.source.signal.producer.sasl.mechanism=PLAIN
      1. format
        debezium.format.key=json
        debezium.format.key.schemas.enable=false
        debezium.format.value=json
        debezium.format.value.schemas.enable=false
        debezium.format.header=json
      1. transformations
        debezium.transforms=beginFilter,endFilter,reroute,tenantInsert
        debezium.transforms.reroute.type=org.apache.kafka.connect.transforms.RegexRouter
        debezium.transforms.reroute.regex=mob-suppliers-2.([^.]).(.)
        debezium.transforms.reroute.replacement=mob-suppliers-2
        debezium.transforms.beginFilter.topic.regex=mob-suppliers-2.transaction
        debezium.transforms.beginFilter.type=io.debezium.transforms.Filter
        debezium.transforms.beginFilter.language=jsr223.groovy
        debezium.transforms.beginFilter.condition=value.status != 'BEGIN'
        debezium.transforms.endFilter.topic.regex=mob-suppliers-2.transaction
        debezium.transforms.endFilter.type=io.debezium.transforms.Filter
        debezium.transforms.endFilter.language=jsr223.groovy
        debezium.transforms.endFilter.condition=!(value.status == 'END' && value.event_count == 0)
        debezium.transforms.tenantInsert.type=org.apache.kafka.connect.transforms.InsertField$Value
        debezium.transforms.tenantInsert.static.field=tenantId
        debezium.transforms.tenantInsert.static.value=test
      1. logging
        quarkus.log.console.json=false
        quarkus.log.level=INFO

        What is the captured database version and mode of deployment?

      local sql server 2019

      What behavior do you expect?

      filtered initial snapshot

      What behavior do you see?

      complete snapshot

      Do you see the same behaviour using the latest released Debezium version?

      yes

      Do you have the connector logs, ideally from start till finish?

      How to reproduce the issue using our tutorial deployment?

      I sent this message to our signal topic:

      mob-suppliers-2:{"type":"execute-snapshot", "data":{"type": "blocking", "data-collections": ["TESTDB_2024.dbo.tcode"], "additional-conditions": [

      {"data-collection": "TESTDB_2024.dbo.tcode", "filter": "short_description='HU'"}

      ]}}

        1. image002.gif
          image002.gif
          120 kB
        2. image003.png
          image003.png
          2 kB
        3. image004.png
          image004.png
          0.1 kB
        4. image005.png
          image005.png
          2 kB
        5. image006.png
          image006.png
          3 kB
        6. image009.png
          image009.png
          277 kB
        7. image-2024-10-28-15-27-51-176.png
          image-2024-10-28-15-27-51-176.png
          379 kB
        8. image-2024-10-28-15-43-13-735.png
          image-2024-10-28-15-43-13-735.png
          376 kB

              rh-ee-mvitale Mario Fiore Vitale
              david.demaesschalck@dobby.be David De Maesschalck
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: