Details
-
Bug
-
Resolution: Not a Bug
-
Major
-
None
-
1.2.1.Final
-
None
Description
Embedded Debezium with
mysql-connector did not generate each kafka topic for DML event. (format : serverName.databaseName.tableName)
Also, it did not send event to kafka topic, but notifying() event logging is works.
[build.gradle]
plugins { id("org.springframework.boot") version "2.3.3.RELEASE" id("io.spring.dependency-management") version "1.0.9.RELEASE" kotlin("jvm") version "1.4.0" kotlin("plugin.spring") version "1.4.0" } group = "io.cdc" version = "1.0.0-SNAPSHOT" repositories { mavenCentral() } tasks.withType<Test> { useJUnitPlatform() } dependencies { implementation(kotlin("stdlib-jdk8")) implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-actuator") implementation("io.debezium:debezium-embedded:1.2.1.Final") implementation("io.debezium:debezium-connector-mysql:1.2.1.Final") implementation("io.github.microutils:kotlin-logging:1.8.3") implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.11.2") testImplementation("org.springframework.boot:spring-boot-starter-test") { exclude(group = "org.junit.vintage", module = "junit-vintage-engine") } } tasks { compileKotlin { kotlinOptions.jvmTarget = "1.8" } compileTestKotlin { kotlinOptions.jvmTarget = "1.8" } }
[DebeziumConfig]
@Configuration class DebeziumConfig { companion object: KLogging() @PostConstruct fun init() { Executors.newSingleThreadExecutor().execute(engine()) } fun engine(): DebeziumEngine<ChangeEvent<String, String>> { return DebeziumEngine.create(Json::class.java) .using(getProperties()) .notifying { event -> logger.info { "DebeziumEngine : $event" } } .build() } @Bean fun getProperties(): Properties { val properties = Properties() properties.setProperty("name", "engine") properties.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector") properties.setProperty("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore") properties.setProperty("offset.storage.topic", "cdc.offset") properties.setProperty("offset.storage.partitions", "1") properties.setProperty("offset.storage.replication.factor", "3") /* begin connector properties */ properties.setProperty("database.hostname", "localhost") properties.setProperty("database.port", "3306") properties.setProperty("database.user", "root") properties.setProperty("database.password", "") properties.setProperty("database.server.name", "CDC") properties.setProperty("database.whitelist", "test_db") properties.setProperty("table.whitelist", "test_db.test_table") properties.setProperty("bootstrap.servers", "local:9092") properties.setProperty("database.history.kafka.bootstrap.servers", "local:9092") properties.setProperty("database.history.kafka.topic", "cdc.database.history") properties.setProperty("include.schema.changes", "false") properties.setProperty("converter.schemas.enable", "false") properties.setProperty("key.converter.schemas.enable", "false") properties.setProperty("value.converter.schemas.enable", "false") /** Prevent Table lock **/ properties.setProperty("snapshot.locking.mode", "none") return properties } }