Uploaded image for project: 'Debezium'
  1. Debezium
  2. DBZ-98

SnapshotReader should insert into schema-change.inventory with many records, not a merged ddl field.

XMLWordPrintable

    • Icon: Enhancement Enhancement
    • Resolution: Duplicate
    • Icon: Major Major
    • None
    • 0.2.4, 0.3
    • mysql-connector
    • None

      In binlog mode, the records in schema-change.inventory Kafka topic each only has one ddl sql; But in Snapshot mode, one record has many ddl sqls.

      Our team wrote a sink-connector-schemaregistry-debezium to parse schema-change.inventory, and our code class binds the parsing logic with a single SinkRecord, and it's not easy to refactor it to support a record with lots of ddl.

      It would be much easier to fix it in SnapshotReader.

                  // Now process all of our tables for each database ...
                  for (Map.Entry<String, List<TableId>> entry : tableIdsByDbName.entrySet()) {
                      String dbName = entry.getKey();
                      // First drop, create, and then use the named database ...
                      ddlStatements.add("DROP DATABASE IF EXISTS " + dbName);
                      ddlStatements.add("CREATE DATABASE " + dbName);
                      ddlStatements.add("USE " + dbName);
                      for (TableId tableId : entry.getValue()) {
                          sql.set("SHOW CREATE TABLE " + tableId);
                          mysql.query(sql.get(), rs -> {
                              if (rs.next()) ddlStatements.add(rs.getString(2)); // CREATE TABLE statement
                          });
                      }
                  }
                  // Finally, apply the DDL statements to the schema and then update the record maker...
                  logger.debug("Step 6b: applying DROP and CREATE statements to connector's table model");
                  String ddlStatementsStr = String.join(";" + System.lineSeparator(), ddlStatements);
                  schema.applyDdl(source, null, ddlStatementsStr, this::enqueueSchemaChanges);
                  context.makeRecord().regenerate();
      
      

              Unassigned Unassigned
              mvj3_jira David Chen (Inactive)
              Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

                Created:
                Updated:
                Resolved: