Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5517

Data lost during connector restart when schema registry are not available

    XMLWordPrintable

Details

    • False
    • None
    • False
    • Hide

      compose.yaml:

      $ cat compose.yaml 
      version: '2'
      services:
        zookeeper:
          image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
          ports:
           - 2181:2181
           - 2888:2888
           - 3888:3888
        kafka:
          image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
          ports:
           - 9092:9092
          links:
           - zookeeper
          environment:
           - ZOOKEEPER_CONNECT=zookeeper:2181
        postgres:
          image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
          ports:
           - 6432:5432
          environment:
           - POSTGRES_USER=postgres
           - POSTGRES_PASSWORD=postgres
        schema-registry:
          image: confluentinc/cp-schema-registry:7.0.1
          ports:
           - 8181:8181
           - 8081:8081
          environment:
           - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
           - SCHEMA_REGISTRY_HOST_NAME=schema-registry
           - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
          links:
           - zookeeper
        connect:
          image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
          ports:
           - 8083:8083
          links:
           - kafka
           - postgres
          environment:
           - BOOTSTRAP_SERVERS=kafka:9092
           - GROUP_ID=1
           - CONFIG_STORAGE_TOPIC=my_connect_configs
           - OFFSET_STORAGE_TOPIC=my_connect_offsets
           - STATUS_STORAGE_TOPIC=my_connect_statuses
           - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
           - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
           - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
           - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
        kafka-ui:
          image: provectuslabs/kafka-ui
          container_name: kafka-ui
          ports:
            - "8080:8080"
          restart: always
          environment:
            - KAFKA_CLUSTERS_0_NAME=local
            - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
            - KAFKA_CLUSTERS_0_ZOOKEEPER=localhost:2181
            - KAFKA_CLUSTERS_0_SCHEMAREGISTRY=http://schema-registry:8081
            - KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME=connect
            - KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS=http://connect:8083
          links:
            - kafka
            - connect
            - schema-registry
      

      connector.json:

      {
        "name": "customers-connector",
        "config": {
          "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
          "tasks.max": "1",
          "database.hostname": "postgres",
          "database.port": "5432",
          "database.user": "postgres",
          "database.password": "postgres",
          "database.dbname" : "postgres",
          "database.server.name": "dbserver1",
          "schema.include.list": "inventory",
          "table.include.list": "inventory.customers",
          "publication.name": "debezium_publication",
          "slot.name": "debezium_replication_slot",
          "plugin.name": "pgoutput",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://schema-registry:8081",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "http://schema-registry:8081"
        }
      }
      
      • DEBEZIUM_VERSION=1.9
      • docker-compose -f compose.yaml up
      • curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @connector.json
      • docker-compose f compose.yaml exec postgres env PGOPTIONS="-search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres'
      • insert into inventory.customers(first_name, last_name, email) values ('test','test',generate_series(1,10)::text);
      • part of the Docker compose is also Kafka-UI so you can easily check the records in the Kafka on localhost:8080
      • docker-compose -f compose.yaml stop schema-registry
      • run several times docker-compose -f compose.yaml restart connect and inserts into the DB insert into inventory.customers(first_name, last_name, email) values ('test','test',generate_series(11,20)::text);
      • docker-compose -f compose.yaml start schema-registry
      • docker-compose -f compose.yaml restart connect
      • usually one or more insert sequences are missing in the Kafka
      Show
      compose.yaml : $ cat compose.yaml version: '2' services: zookeeper: image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION} ports: - 2181:2181 - 2888:2888 - 3888:3888 kafka: image: quay.io/debezium/kafka:${DEBEZIUM_VERSION} ports: - 9092:9092 links: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181 postgres: image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION} ports: - 6432:5432 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres schema-registry: image: confluentinc/cp-schema-registry:7.0.1 ports: - 8181:8181 - 8081:8081 environment: - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092 - SCHEMA_REGISTRY_HOST_NAME=schema-registry - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 links: - zookeeper connect: image: quay.io/debezium/connect:${DEBEZIUM_VERSION} ports: - 8083:8083 links: - kafka - postgres environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui ports: - "8080:8080" restart: always environment: - KAFKA_CLUSTERS_0_NAME=local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 - KAFKA_CLUSTERS_0_ZOOKEEPER=localhost:2181 - KAFKA_CLUSTERS_0_SCHEMAREGISTRY=http://schema-registry:8081 - KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME=connect - KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS=http://connect:8083 links: - kafka - connect - schema-registry connector.json : { "name": "customers-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "database.server.name": "dbserver1", "schema.include.list": "inventory", "table.include.list": "inventory.customers", "publication.name": "debezium_publication", "slot.name": "debezium_replication_slot", "plugin.name": "pgoutput", "key.converter": "io.confluent.connect.avro.AvroConverter", "key.converter.schema.registry.url": "http://schema-registry:8081", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081" } } DEBEZIUM_VERSION=1.9 docker-compose -f compose.yaml up curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @connector.json docker-compose f compose.yaml exec postgres env PGOPTIONS=" -search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres' insert into inventory.customers(first_name, last_name, email) values ('test','test',generate_series(1,10)::text); part of the Docker compose is also Kafka-UI so you can easily check the records in the Kafka on localhost:8080 docker-compose -f compose.yaml stop schema-registry run several times docker-compose -f compose.yaml restart connect and inserts into the DB insert into inventory.customers(first_name, last_name, email) values ('test','test',generate_series(11,20)::text); docker-compose -f compose.yaml start schema-registry docker-compose -f compose.yaml restart connect usually one or more insert sequences are missing in the Kafka

    Description

      When Postgres connector is used with Avro converter and schema registry, it may lose the data when the registry is not available and connector is repeatedly restarted, while data are inserted into the DB.

      Originally reported on this Zupil topic.

      See reproducer bellow. I was able to reproduce only when I run couple of Debezium connector restarts (usually 2 restarts were enough) while inserting the data into the DB.

      The connector does event filtering on the startup

      Attachments

        Activity

          People

            vjuranek@redhat.com Vojtech Juranek
            vjuranek@redhat.com Vojtech Juranek
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: