Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-1849

Redundant calls to refresh schema when using user defined types in PostgreSQL

XMLWordPrintable

    • Hide

      DDL:

      CREATE DOMAIN MONEY2 AS DECIMAL(12, 2);
      CREATE TABLE customer
      (
          id              UUID PRIMARY KEY,
          name            VARCHAR(64),
          balance         money2            NOT NULL DEFAULT 0
      );
      

      Plugin configuration:

      {
      connector.class: "io.debezium.connector.postgresql.PostgresConnector",
      max.queue.size: "131072",
      transforms.PartitionKeyExtractor.type: "com.behalf.core.transformer.PartitionKeyExtractor",
      slot.name: "customers_slot",
      tasks.max: "1",
      database.history.kafka.topic: "schema-changes.public",
      column.blacklist: "public.owner.email_confirmed_on,public.owner.identifier_type,public.owner.identifier,public.owner.email_confirmed,public.invitation.customer_name,public.invitation.merchant_name,public.invitation.customer_email,public.invitation.client_token,public.invitation.message",
      transforms: "route,TypeMapper,PartitionKeyExtractor",
      task.shutdown.graceful.timeout.ms: "30000",
      schema.refresh.mode: "columns_diff_exclude_unchanged_toast",
      table.whitelist: "public.owner,public.business_event,public.invitation,public.terms_optimization,public.customer",
      transforms.route.type: "org.apache.kafka.connect.transforms.RegexRouter",
      tombstones.on.delete: "false",
      transforms.route.regex: ".*",
      decimal.handling.mode: "double",
      poll.interval.ms: "100",
      transforms.TypeMapper.type: "com.behalf.core.transformer.TypeMapper",
      transforms.route.replacement: "data-stream.topic",
      database.user: "postgres",
      database.dbname: "customers",
      transforms.TypeMapper.dictionary: "{"business_name":"varchar","sf_id":"varchar","city":"varchar","enum_code":"varchar","last_name":"varchar","percent":"numeric","zip_code":"varchar","ssn":"varchar","partition_key_type":"varchar","phone_deprecated":"varchar","primitive_float":"float8","primitive_boolean":"bool","email_deprecated":"varchar","phone":"varchar","street":"varchar","tin":"varchar","primitive_integer":"int4","state":"varchar","first_name":"varchar","money2":"numeric","email":"varchar"}",
      database.history.kafka.bootstrap.servers: "kafka:9092",
      time.precision.mode: "connect",
      database.server.name: "customers",
      offset.flush.timeout.ms: "60000",
      database.port: "5432",
      plugin.name: "wal2json_streaming",
      schema.whitelist: "public",
      column.propagate.source.type: ".*",
      offset.flush.interval.ms: "15000",
      connector.version: "cae65ee",
      include.unknown.datatypes: "true",
      database.hostname: "10.200.9.58",
      database.password: "postgres",
      name: "connector-customers",
      max.batch.size: "32768",
      snapshot.mode: "never",
      slot.stream.params: "add-tables=public.customer"
      }
      
      

      Any change in this table will produce this message

      Show
      DDL: CREATE DOMAIN MONEY2 AS DECIMAL (12, 2); CREATE TABLE customer ( id UUID PRIMARY KEY , name VARCHAR (64), balance money2 NOT NULL DEFAULT 0 ); Plugin configuration: { connector.class: "io.debezium.connector.postgresql.PostgresConnector" , max.queue.size: "131072" , transforms.PartitionKeyExtractor.type: "com.behalf.core.transformer.PartitionKeyExtractor" , slot.name: "customers_slot" , tasks.max: "1" , database.history.kafka.topic: "schema-changes. public " , column.blacklist: " public .owner.email_confirmed_on, public .owner.identifier_type, public .owner.identifier, public .owner.email_confirmed, public .invitation.customer_name, public .invitation.merchant_name, public .invitation.customer_email, public .invitation.client_token, public .invitation.message" , transforms: "route,TypeMapper,PartitionKeyExtractor" , task.shutdown.graceful.timeout.ms: "30000" , schema.refresh.mode: "columns_diff_exclude_unchanged_toast" , table.whitelist: " public .owner, public .business_event, public .invitation, public .terms_optimization, public .customer" , transforms.route.type: "org.apache.kafka.connect.transforms.RegexRouter" , tombstones.on.delete: " false " , transforms.route.regex: ".*" , decimal.handling.mode: " double " , poll.interval.ms: "100" , transforms.TypeMapper.type: "com.behalf.core.transformer.TypeMapper" , transforms.route.replacement: "data-stream.topic" , database.user: "postgres" , database.dbname: "customers" , transforms.TypeMapper.dictionary: "{" business_name ":" varchar "," sf_id ":" varchar "," city ":" varchar "," enum_code ":" varchar "," last_name ":" varchar "," percent ":" numeric "," zip_code ":" varchar "," ssn ":" varchar "," partition_key_type ":" varchar "," phone_deprecated ":" varchar "," primitive_float ":" float8 "," primitive_boolean ":" bool "," email_deprecated ":" varchar "," phone ":" varchar "," street ":" varchar "," tin ":" varchar "," primitive_integer ":" int4 "," state ":" varchar "," first_name ":" varchar "," money2 ":" numeric "," email ":" varchar "}" , database.history.kafka.bootstrap.servers: "kafka:9092" , time.precision.mode: "connect" , database.server.name: "customers" , offset.flush.timeout.ms: "60000" , database.port: "5432" , plugin.name: "wal2json_streaming" , schema.whitelist: " public " , column.propagate.source.type: ".*" , offset.flush.interval.ms: "15000" , connector.version: "cae65ee" , include.unknown.datatypes: " true " , database.hostname: "10.200.9.58" , database.password: "postgres" , name: "connector-customers" , max.batch.size: "32768" , snapshot.mode: "never" , slot.stream.params: "add-tables= public .customer" } Any change in this table will produce this message

      When streaming a table that has some UDT (Postgres DOMAIN) we see that DB schema is refreshed for each message, which causes a performance issue.

      Example log (we get this for every message):

      [2020-03-04 13:10:21,196] INFO detected new type for column 'balance', old type was 1700 (money2), new type is 16497 (money2); refreshing table schema (io.debezium.connector.postgresql.PostgresChangeRecordEmitter)
      

      PG version 9.6.15, Wal2json

      Issue is not recreated with Pg10 and pgoutput plugin

              jpechane Jiri Pechanec
              amit.goldi Amit Goldstein (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

                Created:
                Updated:
                Resolved: