Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-2451

Mysql connector not send DML event to kafka topic.

    XMLWordPrintable

Details

    • Bug
    • Resolution: Not a Bug
    • Major
    • None
    • 1.2.1.Final
    • mysql-connector
    • None

    Description

      Embedded Debezium with
      mysql-connector did not generate each kafka topic for DML event. (format : serverName.databaseName.tableName)

      Also, it did not send event to kafka topic, but notifying() event logging is works.

       

      [build.gradle]

      plugins {
       id("org.springframework.boot") version "2.3.3.RELEASE"
       id("io.spring.dependency-management") version "1.0.9.RELEASE"
       kotlin("jvm") version "1.4.0"
       kotlin("plugin.spring") version "1.4.0"
      }
      
      group = "io.cdc"
      version = "1.0.0-SNAPSHOT"
      
      repositories {
       mavenCentral()
      }
      
      tasks.withType<Test> {
       useJUnitPlatform()
      }
      
      dependencies {
       implementation(kotlin("stdlib-jdk8"))
       implementation("org.springframework.boot:spring-boot-starter-web")
       implementation("org.springframework.boot:spring-boot-starter-actuator")
      
       implementation("io.debezium:debezium-embedded:1.2.1.Final")
       implementation("io.debezium:debezium-connector-mysql:1.2.1.Final")
      
       implementation("io.github.microutils:kotlin-logging:1.8.3")
       implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.11.2")
      
       testImplementation("org.springframework.boot:spring-boot-starter-test") {
       exclude(group = "org.junit.vintage", module = "junit-vintage-engine")
       }
      }
      
      tasks {
          compileKotlin {
              kotlinOptions.jvmTarget = "1.8"
          }
          compileTestKotlin {
              kotlinOptions.jvmTarget = "1.8"
          }
      }

       

      [DebeziumConfig]

      @Configuration
      class DebeziumConfig {
      
       companion object: KLogging()
      
       @PostConstruct
       fun init() {
           Executors.newSingleThreadExecutor().execute(engine())
       }
      
       fun engine(): DebeziumEngine<ChangeEvent<String, String>> {
           return DebeziumEngine.create(Json::class.java)
           .using(getProperties())
           .notifying { event -> logger.info { "DebeziumEngine : $event" } }
           .build()
       }
      
      
       @Bean
       fun getProperties(): Properties {
           val properties = Properties()
           properties.setProperty("name", "engine")
           properties.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector")
      
           properties.setProperty("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore")
           properties.setProperty("offset.storage.topic", "cdc.offset")
           properties.setProperty("offset.storage.partitions", "1")
           properties.setProperty("offset.storage.replication.factor", "3")
      
           /* begin connector properties */
           properties.setProperty("database.hostname", "localhost")
           properties.setProperty("database.port", "3306")
           properties.setProperty("database.user", "root")
           properties.setProperty("database.password", "")
           properties.setProperty("database.server.name", "CDC")
           properties.setProperty("database.whitelist", "test_db")
           properties.setProperty("table.whitelist", "test_db.test_table")
      
           properties.setProperty("bootstrap.servers", "local:9092")
      
           properties.setProperty("database.history.kafka.bootstrap.servers", "local:9092")
           properties.setProperty("database.history.kafka.topic", "cdc.database.history")
           properties.setProperty("include.schema.changes", "false")
      
           properties.setProperty("converter.schemas.enable", "false")
           properties.setProperty("key.converter.schemas.enable", "false")
           properties.setProperty("value.converter.schemas.enable", "false")
      
           /** Prevent Table lock **/
           properties.setProperty("snapshot.locking.mode", "none")
      
       return properties
       }
      }
      

       

      Attachments

        Activity

          People

            Unassigned Unassigned
            pkgonan 민규 김 (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: