diff --git a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java index 8c7f35a1f..ccb8c0009 100644 --- a/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java +++ b/debezium-core/src/main/java/io/debezium/relational/history/KafkaDatabaseHistory.java @@ -490,11 +490,8 @@ public void initializeStorage() { try (AdminClient admin = AdminClient.create(this.producerConfig.asProperties())) { - // Find default replication factor - final short replicationFactor = getDefaultTopicReplicationFactor(admin); - // Create topic - final NewTopic topic = new NewTopic(topicName, PARTITION_COUNT, replicationFactor); + final NewTopic topic = new NewTopic(topicName, PARTITION_COUNT); topic.configs(Collect.hashMapOf(CLEANUP_POLICY_NAME, CLEANUP_POLICY_VALUE, RETENTION_MS_NAME, Long.toString(RETENTION_MS_MAX), RETENTION_BYTES_NAME, Long.toString(UNLIMITED_VALUE))); admin.createTopics(Collections.singleton(topic)); @@ -506,50 +503,6 @@ public void initializeStorage() { } } - private short getDefaultTopicReplicationFactor(AdminClient admin) throws Exception { - try { - Config brokerConfig = getKafkaBrokerConfig(admin); - String defaultReplicationFactorValue = brokerConfig.get(DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME).value(); - - // Ensure that the default replication factor property was returned by the Admin Client - if (defaultReplicationFactorValue != null) { - return Short.parseShort(defaultReplicationFactorValue); - } - } - catch (ExecutionException ex) { - // ignore UnsupportedVersionException, e.g. due to older broker version - if (!(ex.getCause() instanceof UnsupportedVersionException)) { - throw ex; - } - } - - // Otherwise warn that no property was obtained and default it to 1 - users can increase this later if desired - LOGGER.warn( - "Unable to obtain the default replication factor from the brokers at {}. Setting value to {} instead.", - producerConfig.getString(BOOTSTRAP_SERVERS), - DEFAULT_TOPIC_REPLICATION_FACTOR); - - return DEFAULT_TOPIC_REPLICATION_FACTOR; - } - - private Config getKafkaBrokerConfig(AdminClient admin) throws Exception { - final Collection nodes = admin.describeCluster().nodes().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); - if (nodes.isEmpty()) { - throw new ConnectException("No brokers available to obtain default settings"); - } - - String nodeId = nodes.iterator().next().idString(); - Set resources = Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, nodeId)); - final Map configs = admin.describeConfigs(resources).all().get( - KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); - - if (configs.isEmpty()) { - throw new ConnectException("No configs have been received"); - } - - return configs.values().iterator().next(); - } - private static Validator forKafka(final Validator validator) { return (config, field, problems) -> { final String history = config.getString(HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY);