Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-5138

Restarting mysql connector task fails with: java.lang.RuntimeException: Unable to register the MBean

    XMLWordPrintable

Details

    • False
    • None
    • False
    • Hide

      Steps:

      1. Start a Debezium instance.
      2. Create a task
      3. Restart the task using the POST /connectors/<id>/tasks/0/restart

      Reproducibility: 80%

      Show
      Steps: Start a Debezium instance. Create a task Restart the task using the POST /connectors/<id>/tasks/0/restart Reproducibility: 80%

    Description

      Bug report

      What Debezium connector do you use and what version?

      Docker image debezium/connect:1.9.2.Final

      What is the connector configuration?

      {
          "connector.class": "io.debezium.connector.mysql.MySqlConnector",
          "snapshot.locking.mode": "none",
          "transforms.outbox.type": "com.pipedrive.Outbox",
          "database.history.consumer.sasl.jaas.config": "${file:/app/file.properties:kafka.jaas}",
          "transforms": "removeCompanyId,route,PartitionByDatabase,heartbeats,outbox",
          "database.history.producer.compression.type": "none",
          "transforms.heartbeats.regex": "__debezium\\-heartbeat\\.db_.*",
          "transforms.removeCompanyId.blacklist": "company_id",
          "include.schema.changes": "false",
          "database.history.producer.linger.ms": "0",
          "database.history.kafka.recovery.poll.interval.ms": "10000",
          "poll.interval.ms": "100",
          "database.initial.statements": "SET SESSION wait_timeout=3000",
          "database.history.kafka.consumer.max.poll.records": "10000",
          "transforms.route.topic.replacement": "db.$3",
          "errors.log.enable": "true",
          "database.history.producer.sasl.mechanism": "SCRAM-SHA-256",
          "database.user": "debezium",
          "database.history.producer.acks": "1",
          "transforms.route.key.field.regex": "([^.]+)\\.company_([0-9]+)\\.([^.]+)",
          "database.history.kafka.bootstrap.servers": "10.222.24.73:9092,10.222.26.142:9092,10.222.20.49:9092,10.222.20.46:9092,10.222.20.15:9092,10.222.20.66:9092,10.222.20.35:9092,10.222.20.52:9092,10.222.20.43:9092,10.222.20.29:9092,10.222.20.59:9092",
          "internal.database.history.ddl.filter": ".*_mem_.*",
          "heartbeat.interval.ms": "300000",
          "inconsistent.schema.handling.mode": "fail",
          "transforms.route.key.field.replacement": "$2",
          "gtid.new.channel.position": "earliest",
          "transforms.heartbeats.type": "org.apache.kafka.connect.transforms.RegexRouter",
          "log4j.logger.io.debezium.connector.mysql": "DEBUG, stdout",
          "ddl.parser.mode": "antlr",
          "database.password": "${file:/app/file.properties:mysql.password}",
          "transforms.PartitionByDatabase.type": "com.pipedrive.PartitionByDatabase",
          "name": "job_1341",
          "database.history.store.only.monitored.tables.ddl": "true",
          "errors.tolerance": "none",
          "max.batch.size": "4096",
          "database.history.consumer.sasl.mechanism": "SCRAM-SHA-256",
          "snapshot.mode": "schema_only",
          "connect.timeout.ms": "120000",
          "max.queue.size": "5120",
          "transforms.route.key.field.name": "company_id",
          "tasks.max": "1",
          "database.history.kafka.topic": "debezium-history-v17-company-db1341",
          "database.history.kafka.security.protocol": "SASL_PLAINTEXT",
          "database.history.consumer.security.protocol": "SASL_PLAINTEXT",
          "transforms.outbox.whitelist": "db.events_blackhole",
          "database.history.kafka.recovery.attempts": "10000",
          "log4j.additivity.io.debezium.connector.mysql": "false",
          "table.whitelist": <tables regex>,
          "tombstones.on.delete": "false",
          "transforms.route.type": "io.debezium.transforms.ByLogicalTableRouter",
          "transforms.route.topic.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
          "transforms.PartitionByDatabase.partitions": "32",
          "database.history.skip.unparseable.ddl": "true",
          "database.history.kafka.sasl.mechanism": "SCRAM-SHA-256",
          "database.history.producer.sasl.jaas.config": "${file:/app/file.properties:kafka.jaas}",
          "database.history.kafka.sasl.jaas.config": "${file:/app/file.properties:kafka.jaas}",
          "database.server.id": "134100",
          "database.history.producer.security.protocol": "SASL_PLAINTEXT",
          "database.server.name": "db_1341",
          "database.port": "3306",
          "transforms.heartbeats.replacement": "debezium-heartbeats",
          "database.hostname": "10.222.25.94",
          "transforms.removeCompanyId.type": "org.apache.kafka.connect.transforms.ReplaceField$Key"
      } 
      --- Setting property from CONNECT_PRODUCER_COMPRESSION_TYPE: producer.compression.type=snappy
      --- Setting property from CONNECT_REST_ADVERTISED_PORT: rest.advertised.port=8083
      --- Setting property from CONNECT_HEARTBEAT_INTERVAL_MS: heartbeat.interval.ms=10000
      --- Setting property from CONNECT_OFFSET_STORAGE_PARTITIONS: offset.storage.partitions=1
      --- Setting property from CONNECT_SESSION_TIMEOUT_MS: session.timeout.ms=150000
      --- Setting property from CONNECT_PRODUCER_ACKS: producer.acks=all
      --- Setting property from CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: config.storage.replication.factor=3
      --- Setting property from CONNECT_OFFSET_STORAGE_TOPIC: offset.storage.topic=debezium-offsets-db1341
      --- Setting property from CONNECT_SASL_JAAS_CONFIG: sasl.jaas.config=[hidden]
      --- Setting property from CONNECT_PRODUCER_BUFFER_MEMORY: producer.buffer.memory=36700160
      --- Setting property from CONNECT_STATUS_STORAGE_PARTITIONS: status.storage.partitions=1
      --- Setting property from CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: offset.storage.replication.factor=3
      --- Setting property from CONNECT_KEY_CONVERTER: key.converter=org.apache.kafka.connect.json.JsonConverter
      --- Setting property from CONNECT_CONFIG_STORAGE_TOPIC: config.storage.topic=debezium-configs-db1341
      --- Setting property from CONNECT_PRODUCER_MAX_REQUEST_SIZE: producer.max.request.size=36700160
      --- Setting property from CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: value.converter.schemas.enable=false
      --- Setting property from CONNECT_GROUP_ID: group.id=debezium-group-db1341
      --- Setting property from CONNECT_REST_ADVERTISED_HOST_NAME: rest.advertised.host.name=10.235.2.140
      --- Setting property from CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: connector.client.config.override.policy=All
      --- Setting property from CONNECT_REST_HOST_NAME: rest.host.name=10.235.2.140
      --- Setting property from CONNECT_PRODUCER_SASL_MECHANISM: producer.sasl.mechanism=SCRAM-SHA-256
      --- Setting property from CONNECT_PRODUCER_LINGER_MS: producer.linger.ms=50
      --- Setting property from CONNECT_PRODUCER_SECURITY_PROTOCOL: producer.security.protocol=SASL_PLAINTEXT
      --- Setting property from CONNECT_PRODUCER_SASL_JAAS_CONFIG: producer.sasl.jaas.config=[hidden]
      --- Setting property from CONNECT_VALUE_CONVERTER: value.converter=org.apache.kafka.connect.json.JsonConverter
      --- Setting property from CONNECT_CONSUMER_SASL_JAAS_CONFIG: consumer.sasl.jaas.config=[hidden]
      --- Setting property from CONNECT_CONFIG_PROVIDERS_FILE_CLASS: config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider
      --- Setting property from CONNECT_SECURITY_PROTOCOL: security.protocol=SASL_PLAINTEXT
      --- Setting property from CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: key.converter.schemas.enable=false
      --- Setting property from CONNECT_REST_PORT: rest.port=8083
      --- Setting property from CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: status.storage.replication.factor=3
      --- Setting property from CONNECT_STATUS_STORAGE_TOPIC: status.storage.topic=debezium-status-db1341
      --- Setting property from CONNECT_CONFIG_PROVIDERS: config.providers=file
      --- Setting property from CONNECT_OFFSET_FLUSH_TIMEOUT_MS: offset.flush.timeout.ms=5000
      --- Setting property from CONNECT_CONSUMER_SASL_MECHANISM: consumer.sasl.mechanism=SCRAM-SHA-256
      --- Setting property from CONNECT_MAX_POLL_INTERVAL_MS: max.poll.interval.ms=150000
      --- Setting property from CONNECT_PLUGIN_PATH: plugin.path=/kafka/connect
      --- Setting property from CONNECT_CONSUMER_SECURITY_PROTOCOL: consumer.security.protocol=SASL_PLAINTEXT
      --- Setting property from CONNECT_OFFSET_FLUSH_INTERVAL_MS: offset.flush.interval.ms=30000
      --- Setting property from CONNECT_BOOTSTRAP_SERVERS: bootstrap.servers=10.222.24.73:9092,10.222.26.142:9092,10.222.20.49:9092,10.222.20.46:9092,10.222.20.15:9092,10.222.20.66:9092,10.222.20.35:9092,10.222.20.52:9092,10.222.20.43:9092,10.222.20.29:9092,10.222.20.59:9092
      --- Setting property from CONNECT_TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS: task.shutdown.graceful.timeout.ms=30000
      --- Setting property from CONNECT_SASL_MECHANISM: sasl.mechanism=SCRAM-SHA-256 

       

      What are the captured database version and mode of deployment?

      on-premises in a Docker container

      What behaviour do you expect?

      The connector task is successfully restarted when calling:

      POST /connectors/<cpnnector_id>/tasks/<task_id>/restart 

      What behaviour do you see?

      Getting the following error when calling the connector task restart endpoint:

      2022-05-17 07:04:30,834 ERROR || Graceful stop of task job_1243-0 failed. [org.apache.kafka.connect.runtime.Worker]
      2022-05-17 07:04:30,954 ERROR || WorkerSourceTask{id=job_1243-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
      java.lang.RuntimeException: Unable to register the MBean 'debezium.mysql:type=connector-metrics,context=schema-history,server=db_1243'
      at io.debezium.metrics.Metrics.register(Metrics.java:77)
      at io.debezium.relational.history.DatabaseHistoryMetrics.started(DatabaseHistoryMetrics.java:95)
      at io.debezium.relational.history.AbstractDatabaseHistory.start(AbstractDatabaseHistory.java:82)
      at io.debezium.relational.history.KafkaDatabaseHistory.start(KafkaDatabaseHistory.java:261)
      at io.debezium.relational.HistorizedRelationalDatabaseSchema.<init>(HistorizedRelationalDatabaseSchema.java:42)
      at io.debezium.connector.mysql.MySqlDatabaseSchema.<init>(MySqlDatabaseSchema.java:93)
      at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:94)
      at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130)
      at org.apache.kafka.connect.runtime.WorkerSourceTask.initializeAndStart(WorkerSourceTask.java:225)
      at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
      at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
      at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
      at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
      at java.base/java.lang.Thread.run(Thread.java:829)
      Caused by: javax.management.InstanceAlreadyExistsException: debezium.mysql:type=connector-metrics,context=schema-history,server=db_1243
      at java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
      at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
      at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
      at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
      at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
      at java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
      at io.debezium.metrics.Metrics.register(Metrics.java:73)
      ... 15 more 

      The task may remain in the FAILED or RUNNING state after that. If the task is still in the RUNNING state, the events are not processed anyways. The status is got using the /connectors/<connector_id>/status endpoint.

      Do you see the same behaviour using the latest released Debezium version?

      Yes, 1.9.2.Final is the latest version. Does not reproduce with 1.7.0.Final.

      Do you have the connector logs, ideally from start till finish?

      Yes.

      How to reproduce the issue using our tutorial deployment?

      1. Start a Debezium instance.
      2. Create a task
      3. Restart the task using the POST /connectors/<id>/tasks/0/restart

      Attachments

        Issue Links

          Activity

            People

              jpechane Jiri Pechanec
              vkassenbaev Valeriy Kassenbaev (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: