Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5030

Unable to use multiple debezium transformation together ?

    XMLWordPrintable

Details

    • Bug
    • Resolution: Not a Bug
    • Major
    • None
    • 1.9.0.Final
    • None
    • False
    • None
    • False

    Description

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      I use the debezium-connector-mysql & I use Debezium 1.9.0

      What is the connector configuration ?

      {
         "tasks.max":"1",
         "connector.class":"io.debezium.connector.mysql.MySqlConnector",
         "database.hostname":"DB_HOST",
         "database.port":"3306",
         "database.user":"root",
         "database.password":"debezium",
         "database.server.id":"438567",
         "database.server.name":"contact_db",
         "database.include.list":"DB_NAME",
         "database.serverTimezone":"Europe/Paris",
         "database.connectionTimeZone":"Europe/Paris",
         "database.history.kafka.bootstrap.servers":"KAFKA_HOST:9092",
         "database.history.kafka.topic":"contact_db.schema-changes",
         "topic.creation.default.replication.factor":1,
         "topic.creation.default.partitions":1,
         "topic.creation.default.cleanup.policy":"compact",
         "include.schema.changes":"true",
         "snapshot.mode":"schema_only",
         "table.include.list":"DB_NAME\\.(.*)",
         "snapshot.locking.mode":"none",
         "transforms":"Reroute",
         "database.history.store.only.captured.tables.ddl":"true",
         "transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter",
         "transforms.Reroute.topic.regex":"contact_db\\.DB_NAME\\.(.*)",
         "transforms.Reroute.topic.replacement":"contact.debezium.changes",
         "transforms.Reroute.key.field.regex":"contact_db\\.DB_NAME\\.(.*)",
         "transforms.Reroute.key.field.replacement":"$1",
         "transforms.Reroute.key.field.name":"universe",
         "key.converter":"org.apache.kafka.connect.json.JsonConverter",
         "key.converter.schemas.enable":"false",
         "value.converter":"org.apache.kafka.connect.json.JsonConverter",
         "value.converter.schemas.enable":"false",
        "column.exclude.list":".*\\.last_.*,.*\\.nouverts.*,.*\\.nclicks.*,.*\\.nenvois,.*\\.nbounces,.*\\.nbounces_sms,.*\\.nclickssms,.*\\.nx,.*\\.nsms,.*\\.nclicksms,.*\\.ntransfo,.*\\.npurchases,.*\\.sommeca",
         "transforms":"filter",
         "transforms.filter.type":"io.debezium.transforms.Filter",
         "transforms.filter.language":"jsr223.groovy",
         "transforms.filter.condition":"value.op == 'u'",
         "transforms.filter.topic.regex":"contact.debezium.changes"
      }

       

      What is the captured database version and mode of depoyment?

      I used a docker container on my local machine. I used the debezium/example-mysql image so according to the Dockerfile it's MySQL 8.0 (https://hub.docker.com/r/debezium/example-mysql)

      What behaviour do you expect?

      I expect all the changes in the tables of the 'DB_NAME' to be forwarded to the Kafka topic "contact.debezium.changes" and also my filter to be applied (so I should only have the updates)

      What behaviour do you see?

      The problem is that when I do not use the Message filtering feature of debezium everything works as expected. Topic routing is working and everything of all the tables in the database 'DB_NAME' and in the topic "contact.debezium.changes". BUT whenever I add the Message filtering to my connector config than the topic routing stops working. I ended up having multiple topics named contact_db.DB_NAME.{the_name_of_the_mysql_table}

      Do you see the same behaviour using the latest relesead Debezium version?

      I believe i'm currently using the latest version 

      Do you have the connector logs, ideally from start till finish?

      Because I did all theses tests using docker containers I add a docker-compose running so here you can find all the logs of the MySQL, Zookeeper, Kafka & Kafka connect =>  docker-logs

      How to reproduce the issue using our tutorial deployment?

      I believe the best way to reproduce the issue would be to use https://github.com/debezium/debezium-examples/blob/main/tutorial/docker-compose-mysql.yaml (this is what I used). Then after everything is up you can follow the steps:

      1 : Create a database in the mysql server named {DB_NAME}
      2 : Create multiple tables in this database with fields (I add specific fields for my usage but I don't think it should matter to reproduce the problem)
      3 : Create the connector in Kafka connect, don't forget to replace the DB_NAME with the actual name of your database (without the transforms.Filter : 
      "transforms":"filter",
      "transforms.filter.type":"io.debezium.transforms.Filter",
      "transforms.filter.language":"jsr223.groovy",
      "transforms.filter.condition":"value.op == 'u'",
      "transforms.filter.topic.regex":"contact.debezium.changes")
      4 : Test out your set up ! You MySQL queries should produces result in a Kafka topic named "contact.debezium.changes" 
      5 : Now update your Kafka connector (I used the Kafka connect REST API to do it) by adding to the config the transforms filter part (you'll need to follow the prequesites of Messaging filter by adding the scripting debezium jar + groovy jars in your Kafka connect)
      6 : Test again by running multiple MySQL queries, your changes end up in different Kafka topics named contact_db.{DB_NAME}.{name_of_mysql_table} and also the filtering doesn't applied since I used a predicate on the topic name for "contact.debezium.changes"

      I expected to be able to use both of theses transformations together  

      For a little context about what I'm trying to do : I have a lot of MySQL tables, I want all the changes in a single Kafka topic. But I also have excluded some columns that I'm not interested in, the problem is that debezium still produces kafka records for updates on the excluded fields (there's an open issue for this as a potential enhancement I believe but it's old and hasn't been touched in a long time), so I figured that I would do it by filtering update messages that don't have any changes. This is how I ended up trying to use both topic routing  & message filtering at the same time. 

      Attachments

        Activity

          People

            Unassigned Unassigned
            ghacout Gauthier Hacout (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: