Feature Request
Resolution: Done
In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.
Bug report
For bug reports, provide this information, please:
What Debezium connector do you use and what version?
Debezium Server 3.0.7.Final with MySQL source connector and RabbitMQ sink
What is the connector configuration?
# Sink connector config - RabbitMQ debezium.sink.type=rabbitmq debezium.sink.rabbitmq.connection.host=rabbitmq debezium.sink.rabbitmq.connection.port=5672 debezium.sink.rabbitmq.connection.username=guest debezium.sink.rabbitmq.connection.password=guest debezium.sink.rabbitmq.connection.virtual.host=/ debezium.sink.rabbitmq.ackTimeout=3000 debezium.sink.rabbitmq.delivery.mode=2 # Source connector config - MySQL debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector debezium.source.database.hostname=mysql debezium.source.database.dbname=producer debezium.source.database.port=8779 debezium.source.database.user=root debezium.source.database.password=root debezium.source.table.include.list=producer.outbox debezium.source.database.server.id=184054 debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.topic.prefix=load.test debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory debezium.source.schema.history.internal.file.filename=data/schistory.dat debezium.source.snapshot.mode=initial # Read schema changes only in debezium.source.database.dbname schema.history.internal.store.only.captured.databases.ddl=true # Read and store schema changes on all non-system tables in database schema.history.internal.store.only.captured.tables.ddl=false # Format config debezium.format.key=json debezium.format.value=json # Transformations debezium.transforms=filter,outbox ## FILTER CONFIG # Filter events that are going to be sent to the sink debezium.transforms.filter.type=io.debezium.transforms.Filter # Using Groovy as the langugage for defining the filter debezium.transforms.filter.language=jsr223.groovy # We only want c (create) events (INSERT) debezium.transforms.filter.condition=value.schema().field("op") && value.getString("op") == "c" ## OUTBOX CONFIG # Outbox transformation debezium.transforms.outbox.type=io.debezium.transforms.outbox.EventRouter # Outbox table column to get the destination exchange. Overrides "aggregatetype" debezium.transforms.outbox.route.by.field=exchange # Route to just the value of debezium.transforms.outbox.route.by.field without any prefix debezium.transforms.outbox.route.topic.regex=(?<routedByValue>.*) debezium.transforms.outbox.route.topic.replacement=$${routedByValue} # Table unique identifier debezium.transforms.outbox.table.field.event.id=uuid # Event key (routing key for RabbitMQ) debezium.transforms.outbox.table.field.event.key=aggregateid # Event timestamp debezium.transforms.outbox.table.field.event.timestamp=produced_timestamp # Event payload debezium.transforms.outbox.table.field.event.payload=payload # Expand json from this: [{\"id\": 1 to [{"id": 1 debezium.transforms.outbox.table.expand.json.payload=true # Ignore null values during expansion debezium.transforms.outbox.table.json.payload.null.behavior=ignore # Aditional properties to be sent in the event: event_type, produced_timestamp (as timestamp) debezium.transforms.outbox.table.fields.additional.placement=event_type:header,produced_timestamp:header:timestamp # Error if there is no event_type debezium.transforms.outbox.table.fields.additional.error.on.missing=true # Null payload does not emi tombstone event debezium.transforms.outbox.route.tombstone.on.empty.payload=false # If there is an UPDATE to outbox table, logs error and continues consuming debezium.transforms.outbox.table.op.invalid.behavior=error # Quarkus configuration quarkus.log.level=TRACE quarkus.log.console.json=false
What is the captured database version and mode of deployment?
(E.g. on-premises, with a specific cloud provider, etc.)
MySQL 8.0.41, on premise with Docker
What behavior do you expect?
I am using RabbitMQ sink, MySQL source connector and the Outbox event router.
For what I see on the docs (https://debezium.io/documentation/reference/stable/transformations/outbox-event-router.html#outbox-event-router-property-table-field-event-key) you can configure it to read from your outbox table telling it which column represents the key and the payload of the event (or delegate in the defaults, aggregateid and payload), so I expect that key to be sent as the event key, when no static key is configured.
What behavior do you see?
The event is being sent to the correct exchange (taken from static config or outbox table column value), but the routing key that it is using is the routing key specified in the configuration property debezium.sink.rabbitmq.routingKey (static), or the topic name if debezium.sink.rabbitmq.routingKeyFromTopicName is set to true. If I set said config variable to false and I don't set debezium.sink.rabbitmq.routingKey (which is optional), I expect the value of the column represented by table.field.event.key to be sent as the routing key (aggregateid column by default).
- If debezium.sink.rabbitmq.routingKey is set (static value) then it is used as routing key.
- If not set:
- If debezium.sink.rabbitmq.routingKeyFromTopicName is true then the topic name (exchange) is used as routing key
- if not set or set to false, empty string is used as the key for the event. Here I think it should be the value of the column defined by table.field.event.key instead of an empty string. Something similar to the exchange name behaviour.
Looking at the source code (https://github.com/debezium/debezium-server/blob/main/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamChangeConsumer.java#L138) it takes the routing key from said static configuration property, or, if you set debezium.sink.rabbitmq.routingKeyFromTopicName it uses the topic name (the exchange).
When not defining debezium.sink.rabbitmq.routingKey and setting debezium.sink.rabbitmq.routingKeyFromTopicName to false, the routing key is just an empty string
Do you see the same behaviour using the latest released Debezium version?
(Ideally, also verify with latest Alpha/Beta/CR version)
Do you have the connector logs, ideally from start till finish?
(You might be asked later to provide DEBUG/TRACE level log)
When it receives an event, this is the record that is logging:
TRACE [io.deb.ser.rab.RabbitMqStreamChangeConsumer] (pool-11-thread-1) Received event 'EmbeddedEngineChangeEvent [key={"schema":{"type":"string","optional":true},"payload":"test-routing"}, value={"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"payload"},"payload":{"name":"this is the payload"}}, sourceRecord=SourceRecord{sourcePartition={server=load.test}, sourceOffset={ts_sec=1740846017, file=binlog.000002, pos=3782, row=1, server_id=1, event=2}} ConnectRecord{topic='debezium.exchange', kafkaPartition=null, key=test-routing, keySchema=Schema{STRING}, value=Struct{name=this is the payload}, valueSchema=Schema{payload:STRUCT}, timestamp=1740846017749, headers=ConnectHeaders(headers=[ConnectHeader(key=id, value=44e5a4bf-9198-4123-8163-f871fcd06146, schema=Schema{STRING}), ConnectHeader(key=event_type, value=com.zertiban.MyEventType, schema=Schema{STRING}), ConnectHeader(key=timestamp, value=1740846017749037, schema=Schema{io.debezium.time.MicroTimestamp:INT64})])}]'
As you can see, the key is "test-routing", the topic is "debezium.exchange" and it has some headers. The topic is used as the exchange name, the payload as the event payload and the headers as event headers, but the key is just ignored and not used.
How to reproduce the issue using our tutorial deployment?
I am able to reproduce the issue with attached docker-compose and above debezium server configuration.
Feature request or enhancement
For feature requests or enhancements, provide this information, please:
Which use case/requirement will be addressed by the proposed feature?
<Your answer>
Implementation ideas (optional)
final var routingKeyName = routingKey .orElse(routingKeyFromTopicName ? streamNameMapper.map(record.destination()) : record.key()); // Replaced "" by record.key()