Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1963

PostgreSQL Sink connector with outbox event router and Avro uses wrong default io.confluent schema namespace

XMLWordPrintable

    • Hide

      Install confluent 5.4.1
      Install debezium-connector-postgres 1.1.0
      Create a postgresql connector with outbox SMT for a given table
      Check the schema created in the avro registry

      Show
      Install confluent 5.4.1 Install debezium-connector-postgres 1.1.0 Create a postgresql connector with outbox SMT for a given table Check the schema created in the avro registry

      I'm building a POC application using the event outbox pattern layed out in :
      https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/

      I'm using confluent platform 5.4.1 installed with the confluent CLI and debezium-connector-postgres-1.1.0 installed with the confluent-hub CLI

      I've followed the following documentation pages to setup the connector with the outbox SMT:

      https://debezium.io/documentation/reference/1.1/configuration/outbox-event-router.html
      https://debezium.io/documentation/reference/1.1/connectors/postgresql.html

      I have the following connector configuration :

      {
        "name": "myapp_outbox_01",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "plugin.name": "pgoutput",
            "auto.register.schemas": "true",
            "publication.name": "debezium_outbox",
            "tasks.max": "1",
            "database.hostname": "127.0.0.1",
            "database.port": "5432",
            "database.user": "debezium",
            "database.password": "debezium",
            "database.dbname" : "mydb",
            "database.server.name": "myapp_outbox_01",
            "schema.whitelist": "public",
            "table.whitelist": "public\\.outbox_outbox_event",
            "heartbeat.interval.ms": "60000",
            "heartbeat.action.query": "INSERT INTO outbox_debezium_heartbeat (id, heartbeat_date) VALUES (1, now()) ON CONFLICT (id) DO UPDATE SET heartbeat_date = EXCLUDED.heartbeat_date",
            "transforms": "outbox",
            "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
            "transforms.outbox.table.field.event.id": "uuid",
            "transforms.outbox.table.field.event.key": "aggregate_id",
            "transforms.outbox.table.field.event.type": "type",
            "transforms.outbox.table.field.event.timestamp": "created_date",
            "transforms.outbox.table.field.event.payload": "payload",
            "transforms.outbox.table.field.event.payload.id": "aggregate_id",
            "transforms.outbox.table.fields.additional.placement": "tenant_uuid:header,tenant_uuid:envelope",
            "transforms.outbox.route.by.field": "aggregate_type",
            "transforms.outbox.route.topic.regex": "(?<routedByValue>.*)",
            "transforms.outbox.route.topic.replacement": "myapp_outbox_01_${routedByValue}"
            }
       }
      

      The connector succesfully picks up events from the outbox table in postgreSQL and writes the events to kafka in the appropriate topic.
      I am able to consume and deserialize the event with the schema automatically registered by debezium in the avro registry.

      However the kafka connect value schema looks like :

      {
        "type" : "record",
        "name" : "ConnectDefault",
        "namespace" : "io.confluent.connect.avro",
        "fields" : [ {
          "name" : "payload",
          "type" : {
            "type" : "string",
            "connect.version" : 1,
            "connect.name" : "io.debezium.data.Json"
          }
        }, {
          "name" : "eventType",
          "type" : "string"
        }, {
          "name" : "tenant_uuid",
          "type" : [ "null", {
            "type" : "string",
            "connect.version" : 1,
            "connect.name" : "io.debezium.data.Uuid"
          } ],
          "default" : null
        } ]
      }
      

      I don't understand why it's using the name ConnectDefault and the namespace io.confluent.connect.avro. Isn't it supposed to use the namespace as specified in the config config database.server.name as specified in the documentation ?

      The use of the default io.confluent namespace is rather problematic because when you generate Java classes on the consumer side using the avro-maven-plugin, all schemas for all outbox SMT topics have the same name and namespace so it results in generating a single Java class instead of one per topic. So when using the kafka consumer config specific.avro.reader = true it won't work if multiple diffrent generated classes as ConnectDefault have collided and resulted in a single class that marches the actual schema for some of the topics. It is also problematic just for the sake of using the name and namespace field for what they were intended to.

      The expected outcome would be that debezium register schemas in the registry with the proper name and namespace taken from the database.server.name and aggregate type or something similar right ?

      Ref : https://debezium.io/documentation/reference/1.1/connectors/postgresql.html :

      > The logical name of the PostgreSQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.

              ccranfor@redhat.com Chris Cranford
              idkw0 V V (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: