Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-6161

Ensure messages from several tables end in the same partition

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Unresolved
    • Icon: Major Major
    • under-triaging
    • None
    • core-library
    • None
    • False
    • None
    • False

      Hello,

      I'm trying to configure io.debezium.connector.postgresql.PostgresConnector connector in order to watch 2 tables and to have messages with same primary key values in the same partition.
      The goal is to have ordered messages. First, the message of T1 with K1 and then the message of T2 with K2 knowing that K1 = K2.

      Example:

      {
          "name": "test_cdc",
          "config": {
              "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
              "slot.name": "debezium_developers_transfer_orders",
              "task.max": "1",
              "schema.include.list": "public",
              "table.include.list": "public.T1,public.T2",
              "message.key.columns": "public.T1:t1k1,t1k2;public.T2:t2k1,t2k2",
              "topic.prefix": "test",
              "database.hostname": "db",
              "database.password": "xxxxxx",
              "database.dbname": "postgres",
              "database.user": "postgres",
              "database.server.name": "local",
              "database.port": "5432",
              "plugin.name": "pgoutput",
              "key.converter": "org.apache.kafka.connect.storage.StringConverter",
              "key.converter.schemas.enable": "false",
              "value.converter": "org.apache.kafka.connect.json.JsonConverter",
              "value.converter.schemas.enable": "false",
              "skipped.operations": "t",
              "snapshot.mode": "never"
          }
      }
      Table T1
      ---------------
      | t1k1 | t1k2 |
      ---------------
      |  10  |  11  |
      |  10  |  12  |
      ---------------
      Table T2
      ---------------
      | t2k1 | t2k2 |
      ---------------
      |  10  |  11  |
      |  11  |  11  |
      ---------------

      There is a one-to-one relation between T1 and T2.
      The primary keys values are the same but the columns names aren't.
      So, if I insert into T1 with key 10-11 and then into T2 with the same key and commit, I need the message of T1 row with key 10-11 published in the same partition as the message of T2 row with key 10-11. The order must be T1 message and then T2 message.

      I think Kafka takes the columns names and the values to compute the partition key.
      As the columns names differ, the partition key differs too

      In my case, I only need the values to be taken in account to compute the partition key.

      An option would be to set aliases on the columns before building the key.
      eg: t1k1 -> k1, t1k2 -> k2, t2k1 -> k1 and t2k2 -> k2.
      So, the property "message.key.columns": "T1.k1,T1.k2;T2.k1,T2.k2" (the aliases are specified instead of the real columns names).

      So you see another solution to solve it ?

            Unassigned Unassigned
            stondini@gmail.com Stephane Tondini (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated: