Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-8752

Add support for event key routing in RabbitMQ sink

XMLWordPrintable

    • Icon: Feature Request Feature Request
    • Resolution: Done
    • Icon: Major Major
    • 3.1.0.CR1
    • 3.0.7.Final
    • debezium-server
    • None

      In order to make your issue reports as actionable as possible, please provide the following information, depending on the issue type.

      Bug report

      For bug reports, provide this information, please:

      What Debezium connector do you use and what version?

      Debezium Server 3.0.7.Final with MySQL source connector and RabbitMQ sink

      What is the connector configuration?

       

      # Sink connector config - RabbitMQ
      debezium.sink.type=rabbitmq
      debezium.sink.rabbitmq.connection.host=rabbitmq
      debezium.sink.rabbitmq.connection.port=5672
      debezium.sink.rabbitmq.connection.username=guest
      debezium.sink.rabbitmq.connection.password=guest
      debezium.sink.rabbitmq.connection.virtual.host=/
      debezium.sink.rabbitmq.ackTimeout=3000
      debezium.sink.rabbitmq.delivery.mode=2
      
      # Source connector config - MySQL
      debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
      debezium.source.database.hostname=mysql
      debezium.source.database.dbname=producer
      debezium.source.database.port=8779
      debezium.source.database.user=root
      debezium.source.database.password=root
      debezium.source.table.include.list=producer.outbox
      debezium.source.database.server.id=184054
      debezium.source.offset.storage.file.filename=data/offsets.dat
      debezium.source.offset.flush.interval.ms=0
      debezium.source.topic.prefix=load.test
      debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
      debezium.source.schema.history.internal.file.filename=data/schistory.dat
      debezium.source.snapshot.mode=initial
      
      # Read schema changes only in debezium.source.database.dbname
      schema.history.internal.store.only.captured.databases.ddl=true
      
      # Read and store schema changes on all non-system tables in database
      schema.history.internal.store.only.captured.tables.ddl=false
      
      # Format config
      debezium.format.key=json
      debezium.format.value=json
      
      # Transformations
      debezium.transforms=filter,outbox
      
      ## FILTER CONFIG
      
      # Filter events that are going to be sent to the sink
      debezium.transforms.filter.type=io.debezium.transforms.Filter
      
      # Using Groovy as the langugage for defining the filter
      debezium.transforms.filter.language=jsr223.groovy
      
      # We only want c (create) events (INSERT)
      debezium.transforms.filter.condition=value.schema().field("op") && value.getString("op") == "c"
      
      ## OUTBOX CONFIG
      
      # Outbox transformation
      debezium.transforms.outbox.type=io.debezium.transforms.outbox.EventRouter
      
      # Outbox table column to get the destination exchange. Overrides "aggregatetype"
      debezium.transforms.outbox.route.by.field=exchange
      
      # Route to just the value of debezium.transforms.outbox.route.by.field without any prefix
      debezium.transforms.outbox.route.topic.regex=(?<routedByValue>.*)
      debezium.transforms.outbox.route.topic.replacement=$${routedByValue}
      
      # Table unique identifier
      debezium.transforms.outbox.table.field.event.id=uuid
      
      # Event key (routing key for RabbitMQ)
      debezium.transforms.outbox.table.field.event.key=aggregateid
      
      # Event timestamp
      debezium.transforms.outbox.table.field.event.timestamp=produced_timestamp
      
      # Event payload
      debezium.transforms.outbox.table.field.event.payload=payload
      
      # Expand json from this: [{\"id\": 1 to [{"id": 1
      debezium.transforms.outbox.table.expand.json.payload=true
      
      # Ignore null values during expansion
      debezium.transforms.outbox.table.json.payload.null.behavior=ignore
      
      # Aditional properties to be sent in the event: event_type, produced_timestamp (as timestamp)
      debezium.transforms.outbox.table.fields.additional.placement=event_type:header,produced_timestamp:header:timestamp
      
      # Error if there is no event_type
      debezium.transforms.outbox.table.fields.additional.error.on.missing=true
      
      # Null payload does not emi tombstone event
      debezium.transforms.outbox.route.tombstone.on.empty.payload=false
      
      # If there is an UPDATE to outbox table, logs error and continues consuming
      debezium.transforms.outbox.table.op.invalid.behavior=error
      
      # Quarkus configuration
      quarkus.log.level=TRACE
      quarkus.log.console.json=false

       

      What is the captured database version and mode of deployment?

      (E.g. on-premises, with a specific cloud provider, etc.)

      MySQL 8.0.41, on premise with Docker

      What behavior do you expect?

      I am using RabbitMQ sink, MySQL source connector and the Outbox event router.

      For what I see on the docs (https://debezium.io/documentation/reference/stable/transformations/outbox-event-router.html#outbox-event-router-property-table-field-event-key) you can configure it to read from your outbox table telling it which column represents the key and the payload of the event (or delegate in the defaults, aggregateid and payload), so I expect that key to be sent as the event key, when no static key is configured.

      What behavior do you see?

      The event is being sent to the correct exchange (taken from static config or outbox table column value), but the routing key that it is using is the routing key specified in the configuration property debezium.sink.rabbitmq.routingKey (static), or the topic name if debezium.sink.rabbitmq.routingKeyFromTopicName is set to true. If I set said config variable to false and I don't set debezium.sink.rabbitmq.routingKey (which is optional), I expect the value of the column represented by table.field.event.key to be sent as the routing key (aggregateid column by default).

      Currently:

      Looking at the source code (https://github.com/debezium/debezium-server/blob/main/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamChangeConsumer.java#L138) it takes the routing key from said static configuration property, or, if you set debezium.sink.rabbitmq.routingKeyFromTopicName it uses the topic name (the exchange).

      When not defining debezium.sink.rabbitmq.routingKey and setting debezium.sink.rabbitmq.routingKeyFromTopicName to false, the routing key is just an empty string

      Do you see the same behaviour using the latest released Debezium version?

      (Ideally, also verify with latest Alpha/Beta/CR version)

      Yes

      Do you have the connector logs, ideally from start till finish?

      (You might be asked later to provide DEBUG/TRACE level log)

      When it receives an event, this is the record that is logging:

      TRACE [io.deb.ser.rab.RabbitMqStreamChangeConsumer] (pool-11-thread-1) Received event 'EmbeddedEngineChangeEvent [key={"schema":{"type":"string","optional":true},"payload":"test-routing"}, value={"schema":{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"payload"},"payload":{"name":"this is the payload"}}, sourceRecord=SourceRecord{sourcePartition={server=load.test}, sourceOffset={ts_sec=1740846017, file=binlog.000002, pos=3782, row=1, server_id=1, event=2}} ConnectRecord{topic='debezium.exchange', kafkaPartition=null, key=test-routing, keySchema=Schema{STRING}, value=Struct{name=this is the payload}, valueSchema=Schema{payload:STRUCT}, timestamp=1740846017749, headers=ConnectHeaders(headers=[ConnectHeader(key=id, value=44e5a4bf-9198-4123-8163-f871fcd06146, schema=Schema{STRING}), ConnectHeader(key=event_type, value=com.zertiban.MyEventType, schema=Schema{STRING}), ConnectHeader(key=timestamp, value=1740846017749037, schema=Schema{io.debezium.time.MicroTimestamp:INT64})])}]' 

      As you can see, the key is "test-routing", the topic is "debezium.exchange" and it has some headers. The topic is used as the exchange name, the payload as the event payload and the headers as event headers, but the key is just ignored and not used.

      How to reproduce the issue using our tutorial deployment?

      I am able to reproduce the issue with attached docker-compose and above debezium server configuration.

      docker-compose-1.yml

      application.properties

      Feature request or enhancement

      For feature requests or enhancements, provide this information, please:

      Which use case/requirement will be addressed by the proposed feature?

      <Your answer>

      Implementation ideas (optional)

       
      https://github.com/debezium/debezium-server/blob/main/debezium-server-rabbitmq/src/main/java/io/debezium/server/rabbitmq/RabbitMqStreamChangeConsumer.java#L138
       

      final var routingKeyName = routingKey
                          .orElse(routingKeyFromTopicName ? streamNameMapper.map(record.destination()) : record.key()); // Replaced "" by record.key() 

        1. application.properties
          3 kB
          Victor Castaño
        2. docker-compose-1.yml
          1 kB
          Victor Castaño

              Unassigned Unassigned
              victorcastanogutierrez Victor Castaño
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: