Skip to content

Commit

Permalink
BE: Implement a mechanism to skip SSL verification (#422)
Browse files Browse the repository at this point in the history
  • Loading branch information
Haarolean authored Dec 30, 2024
1 parent 0f0e2a9 commit d093752
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 46 deletions.
39 changes: 25 additions & 14 deletions api/src/main/java/io/kafbat/ui/config/ClustersProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,31 @@ public class ClustersProperties {
public static class Cluster {
String name;
String bootstrapServers;

TruststoreConfig ssl;

String schemaRegistry;
SchemaRegistryAuth schemaRegistryAuth;
KeystoreConfig schemaRegistrySsl;

String ksqldbServer;
KsqldbServerAuth ksqldbServerAuth;
KeystoreConfig ksqldbServerSsl;

List<ConnectCluster> kafkaConnect;
MetricsConfigData metrics;
Map<String, Object> properties;
boolean readOnly = false;

List<SerdeConfig> serde;
String defaultKeySerde;
String defaultValueSerde;
List<Masking> masking;

MetricsConfigData metrics;
Map<String, Object> properties;
boolean readOnly = false;

Long pollingThrottleRate;
TruststoreConfig ssl;

List<Masking> masking;

AuditProperties audit;
}

Expand Down Expand Up @@ -99,6 +108,16 @@ public static class SchemaRegistryAuth {
public static class TruststoreConfig {
String truststoreLocation;
String truststorePassword;
boolean verifySsl = true;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString(exclude = {"keystorePassword"})
public static class KeystoreConfig {
String keystoreLocation;
String keystorePassword;
}

@Data
Expand All @@ -118,15 +137,6 @@ public static class KsqldbServerAuth {
String password;
}

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString(exclude = {"keystorePassword"})
public static class KeystoreConfig {
String keystoreLocation;
String keystorePassword;
}

@Data
public static class Masking {
Type type;
Expand Down Expand Up @@ -182,6 +192,7 @@ private void flattenClusterProperties() {
}
}

@SuppressWarnings("unchecked")
private Map<String, Object> flattenClusterProperties(@Nullable String prefix,
@Nullable Map<String, Object> propertiesMap) {
Map<String, Object> flattened = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.util.SslPropertiesUtil;
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
import java.io.Closeable;
import java.time.Instant;
import java.util.Map;
Expand Down Expand Up @@ -42,7 +42,7 @@ public Mono<ReactiveAdminClient> get(KafkaCluster cluster) {
private Mono<ReactiveAdminClient> createAdminClient(KafkaCluster cluster) {
return Mono.fromSupplier(() -> {
Properties properties = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.kafbat.ui.model.SortOrderDTO;
import io.kafbat.ui.service.rbac.AccessControlService;
import io.kafbat.ui.util.ApplicationMetrics;
import io.kafbat.ui.util.SslPropertiesUtil;
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
Expand Down Expand Up @@ -264,7 +264,7 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster) {
public EnhancedConsumer createConsumer(KafkaCluster cluster,
Map<String, Object> properties) {
Properties props = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props);
props.putAll(cluster.getProperties());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafbat-ui-consumer-" + System.currentTimeMillis());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/io/kafbat/ui/service/MessagesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.kafbat.ui.model.TopicMessageEventDTO;
import io.kafbat.ui.serdes.ConsumerRecordDeserializer;
import io.kafbat.ui.serdes.ProducerRecordCreator;
import io.kafbat.ui.util.SslPropertiesUtil;
import io.kafbat.ui.util.KafkaClientSslPropertiesUtil;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -199,7 +199,7 @@ private Mono<RecordMetadata> sendMessageImpl(KafkaCluster cluster,
public static KafkaProducer<byte[], byte[]> createProducer(KafkaCluster cluster,
Map<String, Object> additionalProps) {
Properties properties = new Properties();
SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties);
properties.putAll(cluster.getProperties());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ private Flux<KsqlResponseTable> executeSelect(String ksql, Map<String, String> s
* Some version of ksqldb (?..0.24) can cut off json streaming without respect proper array ending like <p/>
* <code>[{"header":{"queryId":"...","schema":"..."}}, ]</code>
* which will cause json parsing error and will be propagated to UI.
* This is a know issue(https://github.com/confluentinc/ksql/issues/8746), but we don't know when it will be fixed.
* To workaround this we need to check DecodingException err msg.
* This is a known issue(<a href="https://github.com/confluentinc/ksql/issues/8746">...</a>), but we don't know when it will be fixed.
* To work around this we need to check DecodingException err msg.
*/
private boolean isUnexpectedJsonArrayEndCharException(Throwable th) {
return th instanceof DecodingException
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.kafbat.ui.util;

import io.kafbat.ui.config.ClustersProperties;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.kafka.common.config.SslConfigs;

public final class KafkaClientSslPropertiesUtil {

private KafkaClientSslPropertiesUtil() {
}

public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
Properties sink) {
if (truststoreConfig == null) {
return;
}

if (!truststoreConfig.isVerifySsl()) {
sink.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
}

if (truststoreConfig.getTruststoreLocation() == null) {
return;
}

sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation());

if (truststoreConfig.getTruststorePassword() != null) {
sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword());
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static Mono<ApplicationPropertyValidationDTO> validateClusterConnection(S
@Nullable
TruststoreConfig ssl) {
Properties properties = new Properties();
SslPropertiesUtil.addKafkaSslProperties(ssl, properties);
KafkaClientSslPropertiesUtil.addKafkaSslProperties(ssl, properties);
properties.putAll(clusterProps);
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// editing properties to make validation faster
Expand Down
23 changes: 0 additions & 23 deletions api/src/main/java/io/kafbat/ui/util/SslPropertiesUtil.java

This file was deleted.

16 changes: 16 additions & 0 deletions api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kafbat.ui.exception.ValidationException;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.FileInputStream;
import java.security.KeyStore;
import java.util.function.Consumer;
Expand Down Expand Up @@ -45,6 +46,10 @@ private static ObjectMapper defaultOM() {

public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) {
return configureNoSsl();
}

return configureSsl(
keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null,
keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null,
Expand Down Expand Up @@ -97,6 +102,17 @@ private WebClientConfigurator configureSsl(
return this;
}

@SneakyThrows
public WebClientConfigurator configureNoSsl() {
var contextBuilder = SslContextBuilder.forClient();
contextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);

SslContext context = contextBuilder.build();

httpClient = httpClient.secure(t -> t.sslContext(context));
return this;
}

public WebClientConfigurator configureBasicAuth(@Nullable String username, @Nullable String password) {
if (username != null && password != null) {
builder.defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(username, password));
Expand Down
4 changes: 4 additions & 0 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4242,6 +4242,10 @@ components:
type: string
truststorePassword:
type: string
verifySsl:
type: boolean
description: Skip SSL verification for the host.
default: true
schemaRegistry:
type: string
schemaRegistryAuth:
Expand Down

0 comments on commit d093752

Please sign in to comment.