Details
-
Bug
-
Resolution: Not a Bug
-
Major
-
None
-
1.9.0.Final
-
None
-
False
-
None
-
False
Description
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
I use the debezium-connector-mysql & I use Debezium 1.9.0
What is the connector configuration ?
{ "tasks.max":"1", "connector.class":"io.debezium.connector.mysql.MySqlConnector", "database.hostname":"DB_HOST", "database.port":"3306", "database.user":"root", "database.password":"debezium", "database.server.id":"438567", "database.server.name":"contact_db", "database.include.list":"DB_NAME", "database.serverTimezone":"Europe/Paris", "database.connectionTimeZone":"Europe/Paris", "database.history.kafka.bootstrap.servers":"KAFKA_HOST:9092", "database.history.kafka.topic":"contact_db.schema-changes", "topic.creation.default.replication.factor":1, "topic.creation.default.partitions":1, "topic.creation.default.cleanup.policy":"compact", "include.schema.changes":"true", "snapshot.mode":"schema_only", "table.include.list":"DB_NAME\\.(.*)", "snapshot.locking.mode":"none", "transforms":"Reroute", "database.history.store.only.captured.tables.ddl":"true", "transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter", "transforms.Reroute.topic.regex":"contact_db\\.DB_NAME\\.(.*)", "transforms.Reroute.topic.replacement":"contact.debezium.changes", "transforms.Reroute.key.field.regex":"contact_db\\.DB_NAME\\.(.*)", "transforms.Reroute.key.field.replacement":"$1", "transforms.Reroute.key.field.name":"universe", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":"false", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "column.exclude.list":".*\\.last_.*,.*\\.nouverts.*,.*\\.nclicks.*,.*\\.nenvois,.*\\.nbounces,.*\\.nbounces_sms,.*\\.nclickssms,.*\\.nx,.*\\.nsms,.*\\.nclicksms,.*\\.ntransfo,.*\\.npurchases,.*\\.sommeca", "transforms":"filter", "transforms.filter.type":"io.debezium.transforms.Filter", "transforms.filter.language":"jsr223.groovy", "transforms.filter.condition":"value.op == 'u'", "transforms.filter.topic.regex":"contact.debezium.changes" }
What is the captured database version and mode of depoyment?
I used a docker container on my local machine. I used the debezium/example-mysql image so according to the Dockerfile it's MySQL 8.0 (https://hub.docker.com/r/debezium/example-mysql)
What behaviour do you expect?
I expect all the changes in the tables of the 'DB_NAME' to be forwarded to the Kafka topic "contact.debezium.changes" and also my filter to be applied (so I should only have the updates)
What behaviour do you see?
The problem is that when I do not use the Message filtering feature of debezium everything works as expected. Topic routing is working and everything of all the tables in the database 'DB_NAME' and in the topic "contact.debezium.changes". BUT whenever I add the Message filtering to my connector config than the topic routing stops working. I ended up having multiple topics named contact_db.DB_NAME.{the_name_of_the_mysql_table}
Do you see the same behaviour using the latest relesead Debezium version?
I believe i'm currently using the latest version
Do you have the connector logs, ideally from start till finish?
Because I did all theses tests using docker containers I add a docker-compose running so here you can find all the logs of the MySQL, Zookeeper, Kafka & Kafka connect => docker-logs
How to reproduce the issue using our tutorial deployment?
I believe the best way to reproduce the issue would be to use https://github.com/debezium/debezium-examples/blob/main/tutorial/docker-compose-mysql.yaml (this is what I used). Then after everything is up you can follow the steps:
1 : Create a database in the mysql server named {DB_NAME}
2 : Create multiple tables in this database with fields (I add specific fields for my usage but I don't think it should matter to reproduce the problem)
3 : Create the connector in Kafka connect, don't forget to replace the DB_NAME with the actual name of your database (without the transforms.Filter :
"transforms":"filter",
"transforms.filter.type":"io.debezium.transforms.Filter",
"transforms.filter.language":"jsr223.groovy",
"transforms.filter.condition":"value.op == 'u'",
"transforms.filter.topic.regex":"contact.debezium.changes")
4 : Test out your set up ! You MySQL queries should produces result in a Kafka topic named "contact.debezium.changes"
5 : Now update your Kafka connector (I used the Kafka connect REST API to do it) by adding to the config the transforms filter part (you'll need to follow the prequesites of Messaging filter by adding the scripting debezium jar + groovy jars in your Kafka connect)
6 : Test again by running multiple MySQL queries, your changes end up in different Kafka topics named contact_db.{DB_NAME}.{name_of_mysql_table} and also the filtering doesn't applied since I used a predicate on the topic name for "contact.debezium.changes"
I expected to be able to use both of theses transformations together
For a little context about what I'm trying to do : I have a lot of MySQL tables, I want all the changes in a single Kafka topic. But I also have excluded some columns that I'm not interested in, the problem is that debezium still produces kafka records for updates on the excluded fields (there's an open issue for this as a potential enhancement I believe but it's old and hasn't been touched in a long time), so I figured that I would do it by filtering update messages that don't have any changes. This is how I ended up trying to use both topic routing & message filtering at the same time.