From c821449fb7c1e2c0f398eeb5cfc4a8abc210c0a4 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Mon, 21 Oct 2024 10:06:41 -0700 Subject: [PATCH] KAFKA-17794: Add some formatting safeguards for KIP-853 (#17504) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit KIP-853 adds support for dynamic KRaft quorums. This means that the quorum topology is no longer statically determined by the controller.quorum.voters configuration. Instead, it is contained in the storage directories of each controller and broker. Users of dynamic quorums must format at least one controller storage directory with either the --initial-controllers or --standalone flags. If they fail to do this, no quorum can be established. This PR changes the storage tool to warn about the case where a KIP-853 flag has not been supplied to format a KIP-853 controller. (Note that broker storage directories can continue to be formatted without a KIP-853 flag.) There are cases where we don't want to specify initial voters when formatting a controller. One example is where we format a single controller with --standalone, and then dynamically add 4 more controllers with no initial topology. In this case, we want the 4 later controllers to grab the quorum topology from the initial one. To support this case, this PR adds the --no-initial-controllers flag. Reviewers: José Armando García Sancio , Federico Valeri --- .../main/scala/kafka/tools/StorageTool.scala | 41 +++++++++---- .../unit/kafka/tools/StorageToolTest.scala | 61 +++++++++++++++++-- docs/ops.html | 4 +- .../kafka/metadata/storage/Formatter.java | 6 +- .../kafka/metadata/storage/FormatterTest.java | 6 +- tests/kafkatest/services/kafka/kafka.py | 9 ++- 6 files changed, 101 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 6dc4653961408..6215e35d38ea6 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.tools import kafka.server.KafkaConfig @@ -31,7 +30,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} import org.apache.kafka.metadata.storage.{Formatter, FormatterException} -import org.apache.kafka.raft.DynamicVoters +import org.apache.kafka.raft.{DynamicVoters, QuorumConfig} import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.config.ReplicationConfigs @@ -126,9 +125,20 @@ object StorageTool extends Logging { foreach(v => formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString))) } Option(namespace.getString("initial_controllers")). - foreach(v => formatter.setInitialVoters(DynamicVoters.parse(v))) + foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v))) if (namespace.getBoolean("standalone")) { - formatter.setInitialVoters(createStandaloneDynamicVoters(config)) + formatter.setInitialControllers(createStandaloneDynamicVoters(config)) + } + if (!namespace.getBoolean("no_initial_controllers")) { + if (config.processRoles.contains(ProcessRole.ControllerRole)) { + if (config.quorumVoters.isEmpty) { + if (!formatter.initialVoters().isPresent()) { + throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG + + " is not set on this controller, you must specify one of the following: " + + "--standalone, --initial-controllers, or --no-initial-controllers."); + } + } + } } Option(namespace.getList("add_scram")). foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]])) @@ -140,7 +150,7 @@ object StorageTool extends Logging { config: KafkaConfig ): DynamicVoters = { if (!config.processRoles.contains(ProcessRole.ControllerRole)) { - throw new TerseFailure("You cannot use --standalone on a broker node.") + throw new TerseFailure("You can only use --standalone on a controller.") } if (config.effectiveAdvertisedControllerListeners.isEmpty) { throw new RuntimeException("No controller listeners found.") @@ -191,13 +201,20 @@ object StorageTool extends Logging { help("The setting to use for a specific feature, in feature=level format. For example: `kraft.version=1`."). action(append()) val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup() - reconfigurableQuorumOptions.addArgument("--standalone", "-s"). - help("Used to initialize a single-node quorum controller quorum."). - action(storeTrue()) - reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I"). - help("The initial controllers, as a comma-separated list of id@hostname:port:directory. The same values must be used to format all nodes. For example:\n" + - "0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n"). - action(store()) + reconfigurableQuorumOptions.addArgument("--standalone", "-s") + .help("Used to initialize a controller as a single-node dynamic quorum.") + .action(storeTrue()) + + reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N") + .help("Used to initialize a server without a dynamic quorum topology.") + .action(storeTrue()) + + reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I") + .help("Used to initialize a server with a specific dynamic quorum topology. The argument " + + "is a comma-separated list of id@hostname:port:directory. The same values must be used to " + + "format all nodes. For example:\n0@example.com:8082:JEXY6aqzQY-32P5TStzaFg,1@example.com:8083:" + + "MvDxzVmcRsaTz33bUuRU6A,2@example.com:8084:07R5amHmR32VDA6jHkGbTA\n") + .action(store()) parser.parseArgs(args) } diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 7d8ac6860b85a..42ee07c50eac1 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -177,8 +177,9 @@ Found problem: defaultDynamicQuorumProperties.setProperty("process.roles", "controller") defaultDynamicQuorumProperties.setProperty("node.id", "0") defaultDynamicQuorumProperties.setProperty("controller.listener.names", "CONTROLLER") - defaultDynamicQuorumProperties.setProperty("controller.quorum.voters", "0@localhost:9093") - defaultDynamicQuorumProperties.setProperty("listeners", "CONTROLLER://127.0.0.1:9093") + defaultDynamicQuorumProperties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093") + defaultDynamicQuorumProperties.setProperty("listeners", "CONTROLLER://:9093") + defaultDynamicQuorumProperties.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093") defaultDynamicQuorumProperties.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true") defaultDynamicQuorumProperties.setProperty(ServerConfigs.UNSTABLE_FEATURE_VERSIONS_ENABLE_CONFIG , "true") @@ -378,7 +379,7 @@ Found problem: properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]("--release-version", "3.9-IV0", "--standalone") - assertEquals("You cannot use --standalone on a broker node.", + assertEquals("You can only use --standalone on a controller.", assertThrows(classOf[TerseFailure], () => runFormatCommand(stream, properties, arguments.toSeq)).getMessage) } @@ -437,11 +438,61 @@ Found problem: "Failed to find content in output: " + stream.toString()) } + @ParameterizedTest + @ValueSource(strings = Array("controller", "broker,controller")) + def testFormatWithoutStaticQuorumFailsWithoutInitialControllersOnController(processRoles: String): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultDynamicQuorumProperties) + if (processRoles.contains("broker")) { + properties.setProperty("listeners", "PLAINTEXT://:9092,CONTROLLER://:9093") + properties.setProperty("advertised.listeners", "PLAINTEXT://127.0.0.1:9092,CONTROLLER://127.0.0.1:9093") + } + properties.setProperty("process.roles", processRoles) + properties.setProperty("log.dirs", availableDirs.mkString(",")) + assertEquals("Because controller.quorum.voters is not set on this controller, you must " + + "specify one of the following: --standalone, --initial-controllers, or " + + "--no-initial-controllers.", + assertThrows(classOf[TerseFailure], + () => runFormatCommand(new ByteArrayOutputStream(), properties, + Seq("--release-version", "3.9-IV0"))).getMessage) + } + @Test - def testBootstrapScramRecords(): Unit = { + def testFormatWithNoInitialControllersSucceedsOnController(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultDynamicQuorumProperties) + properties.setProperty("log.dirs", availableDirs.mkString(",")) + val stream = new ByteArrayOutputStream() + assertEquals(0, runFormatCommand(stream, properties, + Seq("--no-initial-controllers", "--release-version", "3.9-IV0"))) + assertTrue(stream.toString(). + contains("Formatting metadata directory %s".format(availableDirs.head)), + "Failed to find content in output: " + stream.toString()) + } + + @Test + def testFormatWithoutStaticQuorumSucceedsWithoutInitialControllersOnBroker(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() properties.putAll(defaultDynamicQuorumProperties) + properties.setProperty("listeners", "PLAINTEXT://:9092") + properties.setProperty("advertised.listeners", "PLAINTEXT://127.0.0.1:9092") + properties.setProperty("process.roles", "broker") + properties.setProperty("log.dirs", availableDirs.mkString(",")) + val stream = new ByteArrayOutputStream() + assertEquals(0, runFormatCommand(stream, properties, Seq("--release-version", "3.9-IV0"))) + assertTrue(stream.toString(). + contains("Formatting metadata directory %s".format(availableDirs.head)), + "Failed to find content in output: " + stream.toString()) + } + + @Test + def testBootstrapScramRecords(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultStaticQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]( @@ -468,7 +519,7 @@ Found problem: def testScramRecordsOldReleaseVersion(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() - properties.putAll(defaultDynamicQuorumProperties) + properties.putAll(defaultStaticQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]( diff --git a/docs/ops.html b/docs/ops.html index bdf3109e6a711..405137f7b7d16 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3818,9 +3818,9 @@
In the replica description 0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is the replica id, 3Db5QLSqSZieL3rJBUUegA is the replica directory id, controller-0 is the replica's host and 1234 is the replica's port.
Formatting Brokers and New Controllers
- When provisioning new broker and controller nodes that we want to add to an existing Kafka cluster, use the kafka-storage.sh format command without the --standalone or --initial-controllers flags. + When provisioning new broker and controller nodes that we want to add to an existing Kafka cluster, use the kafka-storage.sh format command with the --no-initial-controllers flag. -
$ bin/kafka-storage format --cluster-id <cluster-id> --config server.properties
+
$ bin/kafka-storage.sh format --cluster-id <cluster-id> --config server.properties --no-initial-controllers

Controller membership changes

diff --git a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java index 72995fb753e77..847285c744858 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java @@ -202,11 +202,15 @@ public Formatter setMetadataLogDirectory(String metadataLogDirectory) { return this; } - public Formatter setInitialVoters(DynamicVoters initialControllers) { + public Formatter setInitialControllers(DynamicVoters initialControllers) { this.initialControllers = Optional.of(initialControllers); return this; } + public Optional initialVoters() { + return initialControllers; + } + boolean hasDynamicQuorum() { if (initialControllers.isPresent()) { return true; diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index 6dc6aa7e66336..c5c32484ad5d3 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -372,7 +372,7 @@ public void testFormatWithInitialVoters(boolean specifyKRaftVersion) throws Exce formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); } formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - formatter1.formatter.setInitialVoters(DynamicVoters. + formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); formatter1.formatter.run(); assertEquals(Arrays.asList( @@ -403,7 +403,7 @@ public void testFormatWithInitialVotersFailsWithOlderKraftVersion() throws Excep FormatterContext formatter1 = testEnv.newFormatter(); formatter1.formatter.setFeatureLevel("kraft.version", (short) 0); formatter1.formatter.setUnstableFeatureVersionsEnabled(true); - formatter1.formatter.setInitialVoters(DynamicVoters. + formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); assertTrue(formatter1.formatter.hasDynamicQuorum()); assertEquals("Cannot set kraft.version to 0 if KIP-853 configuration is present. " + @@ -433,7 +433,7 @@ public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Ex FormatterContext formatter1 = testEnv.newFormatter(); formatter1.formatter.setReleaseVersion(MetadataVersion.IBP_3_8_IV0); formatter1.formatter.setFeatureLevel("kraft.version", (short) 1); - formatter1.formatter.setInitialVoters(DynamicVoters. + formatter1.formatter.setInitialControllers(DynamicVoters. parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); formatter1.formatter.setUnstableFeatureVersionsEnabled(true); assertEquals("kraft.version could not be set to 1 because it depends on " + diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 51eab79db9671..a74667cc8300d 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -914,9 +914,12 @@ def start_node(self, node, timeout_sec=60, **kwargs): cmd = "%s format --ignore-formatted --config %s --cluster-id %s" % (kafka_storage_script, KafkaService.CONFIG_FILE, config_property.CLUSTER_ID) if self.dynamicRaftQuorum: cmd += " --feature kraft.version=1" - if not self.standalone_controller_bootstrapped and self.node_quorum_info.has_controller_role: - cmd += " --standalone" - self.standalone_controller_bootstrapped = True + if self.node_quorum_info.has_controller_role: + if self.standalone_controller_bootstrapped: + cmd += " --no-initial-controllers" + else: + cmd += " --standalone" + self.standalone_controller_bootstrapped = True self.logger.info("Running log directory format command...\n%s" % cmd) node.account.ssh(cmd)