-
Notifications
You must be signed in to change notification settings - Fork 14.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
KAFKA-17794: Add some formatting safeguards for KIP-853 (#17504)
KIP-853 adds support for dynamic KRaft quorums. This means that the quorum topology is no longer statically determined by the controller.quorum.voters configuration. Instead, it is contained in the storage directories of each controller and broker. Users of dynamic quorums must format at least one controller storage directory with either the --initial-controllers or --standalone flags. If they fail to do this, no quorum can be established. This PR changes the storage tool to warn about the case where a KIP-853 flag has not been supplied to format a KIP-853 controller. (Note that broker storage directories can continue to be formatted without a KIP-853 flag.) There are cases where we don't want to specify initial voters when formatting a controller. One example is where we format a single controller with --standalone, and then dynamically add 4 more controllers with no initial topology. In this case, we want the 4 later controllers to grab the quorum topology from the initial one. To support this case, this PR adds the --no-initial-controllers flag. Reviewers: José Armando García Sancio <[email protected]>, Federico Valeri <[email protected]>
- Loading branch information
Showing
6 changed files
with
101 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,6 @@ | |
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package kafka.tools | ||
|
||
import kafka.server.KafkaConfig | ||
|
@@ -31,7 +30,7 @@ import org.apache.kafka.common.utils.Utils | |
import org.apache.kafka.server.common.MetadataVersion | ||
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} | ||
import org.apache.kafka.metadata.storage.{Formatter, FormatterException} | ||
import org.apache.kafka.raft.DynamicVoters | ||
import org.apache.kafka.raft.{DynamicVoters, QuorumConfig} | ||
import org.apache.kafka.server.ProcessRole | ||
import org.apache.kafka.server.config.ReplicationConfigs | ||
|
||
|
@@ -126,9 +125,20 @@ object StorageTool extends Logging { | |
foreach(v => formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString))) | ||
} | ||
Option(namespace.getString("initial_controllers")). | ||
foreach(v => formatter.setInitialVoters(DynamicVoters.parse(v))) | ||
foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v))) | ||
if (namespace.getBoolean("standalone")) { | ||
formatter.setInitialVoters(createStandaloneDynamicVoters(config)) | ||
formatter.setInitialControllers(createStandaloneDynamicVoters(config)) | ||
} | ||
if (!namespace.getBoolean("no_initial_controllers")) { | ||
if (config.processRoles.contains(ProcessRole.ControllerRole)) { | ||
if (config.quorumVoters.isEmpty) { | ||
if (!formatter.initialVoters().isPresent()) { | ||
throw new TerseFailure("Because " + QuorumConfig.QUORUM_VOTERS_CONFIG + | ||
" is not set on this controller, you must specify one of the following: " + | ||
"--standalone, --initial-controllers, or --no-initial-controllers."); | ||
} | ||
} | ||
} | ||
} | ||
Option(namespace.getList("add_scram")). | ||
foreach(scramArgs => formatter.setScramArguments(scramArgs.asInstanceOf[util.List[String]])) | ||
|
@@ -140,7 +150,7 @@ object StorageTool extends Logging { | |
config: KafkaConfig | ||
): DynamicVoters = { | ||
if (!config.processRoles.contains(ProcessRole.ControllerRole)) { | ||
throw new TerseFailure("You cannot use --standalone on a broker node.") | ||
throw new TerseFailure("You can only use --standalone on a controller.") | ||
} | ||
if (config.effectiveAdvertisedControllerListeners.isEmpty) { | ||
throw new RuntimeException("No controller listeners found.") | ||
|
@@ -191,13 +201,20 @@ object StorageTool extends Logging { | |
help("The setting to use for a specific feature, in feature=level format. For example: `kraft.version=1`."). | ||
action(append()) | ||
val reconfigurableQuorumOptions = formatParser.addMutuallyExclusiveGroup() | ||
reconfigurableQuorumOptions.addArgument("--standalone", "-s"). | ||
help("Used to initialize a single-node quorum controller quorum."). | ||
action(storeTrue()) | ||
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I"). | ||
help("The initial controllers, as a comma-separated list of id@hostname:port:directory. The same values must be used to format all nodes. For example:\n" + | ||
"[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n"). | ||
action(store()) | ||
reconfigurableQuorumOptions.addArgument("--standalone", "-s") | ||
.help("Used to initialize a controller as a single-node dynamic quorum.") | ||
.action(storeTrue()) | ||
|
||
reconfigurableQuorumOptions.addArgument("--no-initial-controllers", "-N") | ||
.help("Used to initialize a server without a dynamic quorum topology.") | ||
.action(storeTrue()) | ||
|
||
reconfigurableQuorumOptions.addArgument("--initial-controllers", "-I") | ||
.help("Used to initialize a server with a specific dynamic quorum topology. The argument " + | ||
"is a comma-separated list of id@hostname:port:directory. The same values must be used to " + | ||
"format all nodes. For example:\n[email protected]:8082:JEXY6aqzQY-32P5TStzaFg,[email protected]:8083:" + | ||
"MvDxzVmcRsaTz33bUuRU6A,[email protected]:8084:07R5amHmR32VDA6jHkGbTA\n") | ||
.action(store()) | ||
parser.parseArgs(args) | ||
} | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters