diff --git a/README.md b/README.md index 3fef5a5b6..4a7268159 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,7 @@ file example can be found here :[application.example.yml](application.example.ym ### KafkaHQ configuration #### Topic List +* `kafkahq.topic.page-size` number of topics per page (default : 25) * `kafkahq.topic.default-view` is default list view (ALL, HIDE_INTERNAL, HIDE_INTERNAL_STREAM, HIDE_STREAM) * `kafkahq.topic.internal-regexps` is list of regexp to be considered as internal (internal topic can't be deleted or updated) * `kafkahq.topic.stream-regexps` is list of regexp to be considered as internal stream topic @@ -124,6 +125,10 @@ file example can be found here :[application.example.yml](application.example.ym * `kafkahq.topic-data.poll-timeout`: The time, in milliseconds, spent waiting in poll if data is not available in the buffer (default: 1000). + +#### Schema List +* `kafkahq.schema.page-size` number of schemas per page (default : 25) + ### Security * `kafkahq.security.default-roles`: Roles available for all the user even unlogged user, roles available are : @@ -161,13 +166,17 @@ kafkahq: - connect/read ``` - #### Basic Auth * `kafkahq.security.basic-auth`: List user & password with affected roles * `actual-username`: login of the current user as a yaml key (may be anything email, login, ...) * `password`: Password in sha256, can be converted with command `echo -n "password" | sha256sum` * `roles`: Role for current users +> Take care that basic auth will use session store in server **memory**. If your instance is behind a reverse proxy or a +> loadbalancer, you will need to forward the session cookie named `SESSION` and / or use +> [sesssion stickiness](https://en.wikipedia.org/wiki/Load_balancing_(computing)#Persistence) + + ### Server * `kafkahq.server.base-path`: if behind a reverse proxy, path to kafkahq with trailing slash (optional). Example: kafkahq is behind a reverse proxy with url , set base-path: "/kafkahq/". Not needed if you're @@ -188,6 +197,17 @@ KafkaHQ docker image support 3 environment variables to handle configuraiton : * `MICRONAUT_APPLICATION_JSON`: a string that contains the full configuration in JSON format * `MICRONAUT_CONFIG_FILES`: a path to to a configuration file on container. Default path is `/app/application.yml` +## Monitoring endpoint +Several monitoring endpoint is enabled by default. You can disabled it or restrict access only for authenticated users +following micronaut configuration below. + +* `/info` [Info Endpoint](https://docs.micronaut.io/snapshot/guide/index.html#infoEndpoint) with git status + informations. +* `/health` [Health Endpoint](https://docs.micronaut.io/snapshot/guide/index.html#healthEndpoint) +* `/loggers` [Loggers Endpoint](https://docs.micronaut.io/snapshot/guide/index.html#loggersEndpoint) +* `/metrics` [Metrics Endpoint](https://docs.micronaut.io/snapshot/guide/index.html#metricsEndpoint) +* `/prometheus` [Prometheus Endpoint](https://micronaut-projects.github.io/micronaut-micrometer/latest/guide/) + ## Development Environment A docker-compose is provide to start a development environnement. Just install docker & docker-compose, clone the repository and issue a simple `docker-compose -f docker-compose-dev.yml up` to start a dev server. diff --git a/application.example.yml b/application.example.yml index 4b529308f..e115d566c 100644 --- a/application.example.yml +++ b/application.example.yml @@ -59,6 +59,7 @@ kafkahq: # Topic list display options (optional) topic: + page-size: 25 # number of topics per page (default : 25) default-view: HIDE_INTERNAL # default list view (ALL, HIDE_INTERNAL, HIDE_INTERNAL_STREAM, HIDE_STREAM) internal-regexps: # list of regexp to be considered as internal (internal topic can't be deleted or updated) - "^_.*$" @@ -77,6 +78,10 @@ kafkahq: size: 50 # max record per page (default: 50) poll-timeout: 1000 # The time, in milliseconds, spent waiting in poll if data is not available in the buffer. + # Schama list display options (optional) + schema: + page-size: 25 # number of schemas per page (default : 25) + # Auth & Roles (optional) security: default-roles: # Roles available for all the user even unlogged user diff --git a/assets/modules/datas/filter.scss b/assets/modules/datas/filter.scss index fd8c4ef06..6d0a6e4f3 100644 --- a/assets/modules/datas/filter.scss +++ b/assets/modules/datas/filter.scss @@ -12,7 +12,7 @@ @include media-breakpoint-up(sm) { select { - max-width: 200px; + max-width: 170px; } } diff --git a/assets/modules/templates/layout.scss b/assets/modules/templates/layout.scss index 1cc3bd54e..1d68e6833 100644 --- a/assets/modules/templates/layout.scss +++ b/assets/modules/templates/layout.scss @@ -81,7 +81,7 @@ h3.logo { margin-left: $menu-width; } - @include media-breakpoint-down(md) { + @include media-breakpoint-down(sm) { width: 100%; } diff --git a/build.gradle b/build.gradle index c8bf45734..d1cfcd438 100644 --- a/build.gradle +++ b/build.gradle @@ -1,6 +1,6 @@ buildscript { ext { - micronautVersion = "1.1.1" + micronautVersion = "1.1.3" confluentVersion = "5.2.+" kafkaVersion = "2.2.+" } @@ -13,7 +13,7 @@ buildscript { dependencies { // kafkahq - classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.16.0" + classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.17.0" } } @@ -25,11 +25,12 @@ plugins { id "java" id "net.ltgt.apt-eclipse" version "0.21" id "net.ltgt.apt-idea" version "0.21" + id "com.gorylenko.gradle-git-properties" version "2.0.0" // kafkahq id "com.moowork.node" version "1.3.1" - id 'io.franzbecker.gradle-lombok' version '3.0.0' - id 'com.adarshr.test-logger' version '1.6.0' + id 'io.franzbecker.gradle-lombok' version '3.1.0' + id 'com.adarshr.test-logger' version '1.7.0' id 'com.github.psxpaul.execfork' version '0.1.10' id "com.github.ben-manes.versions" version "0.21.0" } @@ -95,7 +96,7 @@ tasks.withType(JavaCompile){ } gradle.taskGraph.whenReady { graph -> - if (graph.hasTask(run)) { + if (graph.hasTask(run) || graph.hasTask(testClasses)) { webpack.enabled = false npmInstall.enabled = false } @@ -108,7 +109,7 @@ dependencies { // micronaut annotationProcessor "io.micronaut:micronaut-inject-java" annotationProcessor "io.micronaut:micronaut-validation" - annotationProcessor "io.micronaut:micronaut-security:" + micronautVersion + annotationProcessor "io.micronaut:micronaut-security" compile "io.micronaut:micronaut-inject" compile "io.micronaut:micronaut-validation" compile 'io.micronaut:micronaut-views' @@ -117,14 +118,17 @@ dependencies { compile "io.micronaut:micronaut-http-server-netty" runtime "ch.qos.logback:logback-classic:1.2.3" runtime 'org.freemarker:freemarker:2.3.28' - compile "io.micronaut:micronaut-security:" + micronautVersion - compile "io.micronaut:micronaut-security-session:" + micronautVersion + compile "io.micronaut:micronaut-security" + compile "io.micronaut:micronaut-security-session" + compile "io.micronaut:micronaut-management" + compile 'io.micronaut.configuration:micronaut-micrometer-registry-prometheus' + compile 'io.github.mweirauch:micrometer-jvm-extras:0.1.3' // kafka compile group: "org.apache.kafka", name: "kafka-clients", version: kafkaVersion compile group: "io.confluent", name: "kafka-schema-registry-client", version: confluentVersion compile group: "io.confluent", name: "kafka-avro-serializer", version: confluentVersion - compile group: 'org.apache.avro', name: 'avro', version: '1.8.2' + compile group: 'org.apache.avro', name: 'avro', version: '1.9.0' compile group: 'org.sourcelab', name: 'kafka-connect-client', version: '2.0.+' // log diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index cb63b7bae..2176bf6ea 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -1,8 +1,6 @@ version: '3.6' volumes: - gradle-cache: - driver: local zookeeper-data: driver: local zookeeper-log: @@ -17,7 +15,6 @@ services: working_dir: /app volumes: - ./:/app - - gradle-cache:/home/gradle/.gradle ports: - 127.11.8.17:8080:8080 depends_on: diff --git a/docker/app/kafkahq b/docker/app/kafkahq index 3d0c88600..f6e0e9b1e 100755 --- a/docker/app/kafkahq +++ b/docker/app/kafkahq @@ -7,4 +7,4 @@ do JAVA_OPTS="${JAVA_OPTS} ${JVM_OPT}" done -/usr/bin/java ${JAVA_OPTS} -jar /app/kafkahq.jar \ No newline at end of file +/usr/local/openjdk-11/bin/java ${JAVA_OPTS} -jar /app/kafkahq.jar \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ea13fdfd1..f4d7b2bf6 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.3.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-5.4.1-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/package-lock.json b/package-lock.json index c7d8b908d..e886fc024 100644 --- a/package-lock.json +++ b/package-lock.json @@ -899,7 +899,7 @@ }, "chalk": { "version": "1.1.3", - "resolved": "http://registry.npmjs.org/chalk/-/chalk-1.1.3.tgz", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-1.1.3.tgz", "integrity": "sha1-qBFcVeSnAv5NFQq9OHKCKn4J/Jg=", "dev": true, "requires": { @@ -2708,7 +2708,8 @@ }, "tar": { "version": "4.4.8", - "bundled": true, + "resolved": "https://registry.npmjs.org/tar/-/tar-4.4.8.tgz", + "integrity": "sha512-LzHF64s5chPQQS0IYBn9IN5h3i98c12bo4NCO7e0sGM2llXQ3p2FGC5sdENN4cTW48O915Sh+x+EXx7XW96xYQ==", "dev": true, "optional": true, "requires": { @@ -2751,9 +2752,9 @@ } }, "fstream": { - "version": "1.0.11", - "resolved": "https://registry.npmjs.org/fstream/-/fstream-1.0.11.tgz", - "integrity": "sha1-XB+x8RdHcRTwYyoOtLcbPLD9MXE=", + "version": "1.0.12", + "resolved": "https://registry.npmjs.org/fstream/-/fstream-1.0.12.tgz", + "integrity": "sha512-WvJ193OHa0GHPEL+AycEJgxvBEwyfRkN1vhjca23OaPVMCaLCXTd5qAu82AjTcgP1UJmytkOKb63Ypde7raDIg==", "dev": true, "requires": { "graceful-fs": "^4.1.2", @@ -3578,7 +3579,7 @@ }, "json5": { "version": "0.5.1", - "resolved": "http://registry.npmjs.org/json5/-/json5-0.5.1.tgz", + "resolved": "https://registry.npmjs.org/json5/-/json5-0.5.1.tgz", "integrity": "sha1-Hq3nrMASA0rYTiOWdn6tn6VJWCE=", "dev": true }, @@ -4357,7 +4358,7 @@ }, "os-locale": { "version": "1.4.0", - "resolved": "http://registry.npmjs.org/os-locale/-/os-locale-1.4.0.tgz", + "resolved": "https://registry.npmjs.org/os-locale/-/os-locale-1.4.0.tgz", "integrity": "sha1-IPnxeuKe00XoveWDsT0gCYA8FNk=", "dev": true, "requires": { @@ -4939,7 +4940,7 @@ }, "readable-stream": { "version": "2.3.6", - "resolved": "http://registry.npmjs.org/readable-stream/-/readable-stream-2.3.6.tgz", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-2.3.6.tgz", "integrity": "sha512-tQtKA9WIAhBF3+VLAseyMqZeBjW0AHJoxOtYqSUZNJxauErmLbVm2FW1y+J/YA9dUrAC39ITejlZWhVIwawkKw==", "dev": true, "requires": { @@ -5831,7 +5832,7 @@ }, "strip-ansi": { "version": "3.0.1", - "resolved": "http://registry.npmjs.org/strip-ansi/-/strip-ansi-3.0.1.tgz", + "resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-3.0.1.tgz", "integrity": "sha1-ajhfuIU9lS1f8F0Oiq+UJ43GPc8=", "dev": true, "requires": { @@ -5890,13 +5891,13 @@ "dev": true }, "tar": { - "version": "2.2.1", - "resolved": "https://registry.npmjs.org/tar/-/tar-2.2.1.tgz", - "integrity": "sha1-jk0qJWwOIYXGsYrWlK7JaLg8sdE=", + "version": "2.2.2", + "resolved": "https://registry.npmjs.org/tar/-/tar-2.2.2.tgz", + "integrity": "sha512-FCEhQ/4rE1zYv9rYXJw/msRqsnmlje5jHP6huWeBZ704jUTy02c5AZyWujpMR1ax6mVw9NyJMfuK2CMDWVIfgA==", "dev": true, "requires": { "block-stream": "*", - "fstream": "^1.0.2", + "fstream": "^1.0.12", "inherits": "2" } }, diff --git a/src/main/java/org/kafkahq/controllers/RedirectController.java b/src/main/java/org/kafkahq/controllers/RedirectController.java index e23d74cf8..d176dde38 100644 --- a/src/main/java/org/kafkahq/controllers/RedirectController.java +++ b/src/main/java/org/kafkahq/controllers/RedirectController.java @@ -25,7 +25,7 @@ public HttpResponse slash() throws URISyntaxException { return HttpResponse.redirect(this.uri("/" + kafkaModule.getClustersList().get(0) + "/topic")); } - @Get("${kafkahq.server.base-path:}") + @Get("${kafkahq.server.base-path:}^") public HttpResponse home() throws URISyntaxException { return HttpResponse.redirect(this.uri("/" + kafkaModule.getClustersList().get(0) + "/topic")); } diff --git a/src/main/java/org/kafkahq/controllers/SchemaController.java b/src/main/java/org/kafkahq/controllers/SchemaController.java index 75e8bd862..aad318123 100644 --- a/src/main/java/org/kafkahq/controllers/SchemaController.java +++ b/src/main/java/org/kafkahq/controllers/SchemaController.java @@ -1,26 +1,38 @@ package org.kafkahq.controllers; +import com.google.common.collect.ImmutableMap; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.micronaut.context.annotation.Value; import io.micronaut.http.*; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Get; import io.micronaut.http.annotation.Post; import io.micronaut.security.annotation.Secured; import io.micronaut.views.View; +import org.codehaus.httpcache4j.uri.URIBuilder; import org.kafkahq.configs.Role; import org.kafkahq.models.Schema; import org.kafkahq.modules.RequestHelper; import org.kafkahq.repositories.SchemaRegistryRepository; +import org.kafkahq.utils.CompletablePaged; import javax.inject.Inject; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; @Secured(Role.ROLE_REGISTRY_READ) @Controller("${kafkahq.server.base-path:}/{cluster}/schema") public class SchemaController extends AbstractController { private SchemaRegistryRepository schemaRepository; + @Value("${kafkahq.schema.page-size:25}") + private Integer pageSize; + @Inject public SchemaController(SchemaRegistryRepository schemaRepository) { this.schemaRepository = schemaRepository; @@ -28,14 +40,54 @@ public SchemaController(SchemaRegistryRepository schemaRepository) { @View("schemaList") @Get - public HttpResponse list(HttpRequest request, String cluster) throws IOException, RestClientException { + public HttpResponse list( + HttpRequest request, + String cluster, + Optional search, + Optional page + ) throws IOException, RestClientException, ExecutionException, InterruptedException { + List> list = this.schemaRepository.getAll(cluster, search); + + URIBuilder uri = URIBuilder.fromURI(request.getUri()); + CompletablePaged paged = new CompletablePaged<>( + list, + this.pageSize, + uri, + page.orElse(1) + ); + return this.template( request, cluster, - "schemas", this.schemaRepository.getAll(cluster) + "schemas", paged.complete(), + "search", search, + "pagination", ImmutableMap.builder() + .put("size", paged.size()) + .put("before", paged.before().toNormalizedURI(false).toString()) + .put("after", paged.after().toNormalizedURI(false).toString()) + .build() ); } + @Get("id/{id}") + public HttpResponse redirectId(HttpRequest request, String cluster, Integer id) throws IOException, RestClientException, URISyntaxException, ExecutionException, InterruptedException { + Schema find = this.schemaRepository.getById(cluster, id); + + if (find != null) { + return HttpResponse.redirect(this.uri("/" + cluster + "/schema/" + find.getSubject() + "/version#" + id)); + } else { + MutableHttpResponse response = HttpResponse.redirect(this.uri("/" + cluster + "/schema")); + + this.toast(response, AbstractController.Toast.builder() + .message("Unable to find avro schema for id '" + id + "'") + .type(AbstractController.Toast.Type.error) + .build() + ); + + return response; + } + } + @Secured(Role.ROLE_REGISTRY_INSERT) @View("schemaCreate") @Get("create") diff --git a/src/main/java/org/kafkahq/controllers/TopicController.java b/src/main/java/org/kafkahq/controllers/TopicController.java index c939e9214..d4b8010dd 100644 --- a/src/main/java/org/kafkahq/controllers/TopicController.java +++ b/src/main/java/org/kafkahq/controllers/TopicController.java @@ -27,6 +27,7 @@ import org.kafkahq.repositories.ConfigRepository; import org.kafkahq.repositories.RecordRepository; import org.kafkahq.repositories.TopicRepository; +import org.kafkahq.utils.CompletablePaged; import org.reactivestreams.Publisher; import javax.inject.Inject; @@ -34,6 +35,7 @@ import java.io.StringWriter; import java.time.Instant; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -47,6 +49,8 @@ public class TopicController extends AbstractController { private Environment environment; @Value("${kafkahq.topic.default-view}") private String defaultView; + @Value("${kafkahq.topic.page-size:25}") + private Integer pageSize; @Inject public TopicController(TopicRepository topicRepository, @@ -64,15 +68,37 @@ public TopicController(TopicRepository topicRepository, @View("topicList") @Get - public HttpResponse list(HttpRequest request, String cluster, Optional search, Optional show) throws ExecutionException, InterruptedException { + public HttpResponse list( + HttpRequest request, String cluster, + Optional search, + Optional show, + Optional page + ) throws ExecutionException, InterruptedException { TopicRepository.TopicListView topicListView = show.orElse(TopicRepository.TopicListView.valueOf(defaultView)); + List> list = this.topicRepository.list( + show.orElse(TopicRepository.TopicListView.valueOf(defaultView)), + search + ); + + URIBuilder uri = URIBuilder.fromURI(request.getUri()); + CompletablePaged paged = new CompletablePaged<>( + list, + this.pageSize, + uri, + page.orElse(1) + ); return this.template( request, cluster, "search", search, "topicListView", topicListView, - "topics", this.topicRepository.list(show.orElse(TopicRepository.TopicListView.valueOf(defaultView)), search) + "topics", paged.complete(), + "pagination", ImmutableMap.builder() + .put("size", paged.size()) + .put("before", paged.before().toNormalizedURI(false).toString()) + .put("after", paged.after().toNormalizedURI(false).toString()) + .build() ); } @@ -232,8 +258,8 @@ public HttpResponse home(HttpRequest request, private ImmutableMap dataPagination(Topic topic, RecordRepository.Options options, List data, URIBuilder uri) { return ImmutableMap.builder() - .put("size", options.getPartition() == null ? topic.getSize() : topic.getSize(options.getPartition())) - .put("before", options.before(data, uri).toNormalizedURI(false).toString()) + .put("size", "≈ " + (options.getPartition() == null ? topic.getSize() : topic.getSize(options.getPartition()))) + // .put("before", options.before(data, uri).toNormalizedURI(false).toString()) .put("after", options.after(data, uri).toNormalizedURI(false).toString()) .build(); } diff --git a/src/main/java/org/kafkahq/metrics/JvmExtrasMetrics.java b/src/main/java/org/kafkahq/metrics/JvmExtrasMetrics.java new file mode 100644 index 000000000..40a86f2c9 --- /dev/null +++ b/src/main/java/org/kafkahq/metrics/JvmExtrasMetrics.java @@ -0,0 +1,33 @@ +package org.kafkahq.metrics; + +import io.github.mweirauch.micrometer.jvm.extras.ProcessMemoryMetrics; +import io.github.mweirauch.micrometer.jvm.extras.ProcessThreadMetrics; +import io.micronaut.configuration.metrics.annotation.RequiresMetrics; +import io.micronaut.context.annotation.Bean; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Primary; +import io.micronaut.context.annotation.Requires; +import io.micronaut.core.util.StringUtils; + +import javax.inject.Singleton; + +import static io.micronaut.configuration.metrics.micrometer.MeterRegistryFactory.MICRONAUT_METRICS_BINDERS; + +@Factory +@RequiresMetrics +@Requires(property = MICRONAUT_METRICS_BINDERS + ".jvm-extras.enabled", value = StringUtils.TRUE, defaultValue = StringUtils.TRUE) +public class JvmExtrasMetrics { + @Bean + @Primary + @Singleton + public ProcessMemoryMetrics processMemoryMetrics() { + return new ProcessMemoryMetrics(); + } + + @Bean + @Primary + @Singleton + public ProcessThreadMetrics processThreadMetrics() { + return new ProcessThreadMetrics(); + } +} diff --git a/src/main/java/org/kafkahq/metrics/KafkaMetrics.java b/src/main/java/org/kafkahq/metrics/KafkaMetrics.java new file mode 100644 index 000000000..81a6e80c0 --- /dev/null +++ b/src/main/java/org/kafkahq/metrics/KafkaMetrics.java @@ -0,0 +1,25 @@ +package org.kafkahq.metrics; + +import io.micrometer.core.instrument.binder.kafka.KafkaConsumerMetrics; +import io.micronaut.configuration.metrics.annotation.RequiresMetrics; +import io.micronaut.context.annotation.Bean; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Primary; +import io.micronaut.context.annotation.Requires; +import io.micronaut.core.util.StringUtils; + +import javax.inject.Singleton; + +import static io.micronaut.configuration.metrics.micrometer.MeterRegistryFactory.MICRONAUT_METRICS_BINDERS; + +@Factory +@RequiresMetrics +@Requires(property = MICRONAUT_METRICS_BINDERS + ".kafka.enabled", value = StringUtils.TRUE, defaultValue = StringUtils.TRUE) +public class KafkaMetrics { + @Bean + @Primary + @Singleton + public KafkaConsumerMetrics processKafkaConsumerMetrics() { + return new KafkaConsumerMetrics(); + } +} diff --git a/src/main/java/org/kafkahq/models/Config.java b/src/main/java/org/kafkahq/models/Config.java index 04175047d..4ebd9b857 100644 --- a/src/main/java/org/kafkahq/models/Config.java +++ b/src/main/java/org/kafkahq/models/Config.java @@ -12,6 +12,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -61,8 +62,8 @@ private String findDescription(String name) { for(Class cls : classes) { try { Field declaredField = cls.getDeclaredField(docName); - return declaredField.get(cls.newInstance()).toString(); - } catch (NoSuchFieldException | IllegalAccessException | InstantiationException e) { } + return declaredField.get(cls.getDeclaredConstructor().newInstance()).toString(); + } catch (NoSuchFieldException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) { } } return null; diff --git a/src/main/java/org/kafkahq/models/Record.java b/src/main/java/org/kafkahq/models/Record.java index a6cfcf4f3..7b1638550 100644 --- a/src/main/java/org/kafkahq/models/Record.java +++ b/src/main/java/org/kafkahq/models/Record.java @@ -24,7 +24,9 @@ public class Record { private final long timestamp; private final TimestampType timestampType; private final byte[] key; + private final Integer keySchemaId; private final byte[] value; + private final Integer valueSchemaId; private final Map headers = new HashMap<>(); private final KafkaAvroDeserializer kafkaAvroDeserializer; @@ -35,7 +37,9 @@ public Record(ConsumerRecord record, KafkaAvroDeserializer kafka this.timestamp = record.timestamp(); this.timestampType = record.timestampType(); this.key = record.key(); + this.keySchemaId = getAvroSchemaId(this.key); this.value = record.value(); + this.valueSchemaId = getAvroSchemaId(this.value); for (Header header: record.headers()) { this.headers.put(header.key(), header.value() != null ? new String(header.value()) : null); } @@ -44,21 +48,25 @@ public Record(ConsumerRecord record, KafkaAvroDeserializer kafka } public String getKeyAsString() { - return convertToString(key); + return convertToString(key, keySchemaId); } public String getKeyAsBase64() { - return new String(Base64.getEncoder().encode(value)); + if (key == null) { + return null; + } else { + return new String(Base64.getEncoder().encode(key)); + } } public String getValueAsString() { - return convertToString(value); + return convertToString(value, valueSchemaId); } - private String convertToString(byte[] payload) { + private String convertToString(byte[] payload, Integer keySchemaId) { if (payload == null) { return null; - } else if (isAvroPayload(payload)) { + } else if (keySchemaId != null) { try { GenericRecord deserialize = (GenericRecord) kafkaAvroDeserializer.deserialize(topic, payload); return deserialize.toString(); @@ -70,19 +78,18 @@ private String convertToString(byte[] payload) { } } - private static boolean isAvroPayload(byte[] payload) { - boolean convert = false; + private static Integer getAvroSchemaId(byte[] payload) { try { ByteBuffer buffer = ByteBuffer.wrap(payload); byte magicBytes = buffer.get(); int schemaId = buffer.getInt(); if (magicBytes == 0 && schemaId >= 0) { - convert = true; + return schemaId; } } catch (Exception ignore) { } - return convert; + return null; } } diff --git a/src/main/java/org/kafkahq/modules/KafkaWrapper.java b/src/main/java/org/kafkahq/modules/KafkaWrapper.java index 6bfe8da0a..983b37166 100644 --- a/src/main/java/org/kafkahq/modules/KafkaWrapper.java +++ b/src/main/java/org/kafkahq/modules/KafkaWrapper.java @@ -11,6 +11,7 @@ import org.kafkahq.models.Partition; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -57,7 +58,7 @@ public Collection listTopics() throws ExecutionException, Interrup return this.listTopics; } - private Map describeTopics = new HashMap<>(); + private Map describeTopics = new ConcurrentHashMap<>(); public Map describeTopics(List topics) throws ExecutionException, InterruptedException { List list = new ArrayList<>(topics); @@ -83,7 +84,7 @@ public Map describeTopics(List topics) throws .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - private Map> describeTopicsOffsets = new HashMap<>(); + private Map> describeTopicsOffsets = new ConcurrentHashMap<>(); public Map> describeTopicsOffsets(List topics) throws ExecutionException, InterruptedException { List list = new ArrayList<>(topics); @@ -150,7 +151,7 @@ public Collection listConsumerGroups() throws ExecutionExc return this.listConsumerGroups; } - private Map describeConsumerGroups = new HashMap<>(); + private Map describeConsumerGroups = new ConcurrentHashMap<>(); public Map describeConsumerGroups(List topics) throws ExecutionException, InterruptedException { List list = new ArrayList<>(topics); @@ -176,7 +177,7 @@ public Map describeConsumerGroups(List .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - private Map> consumerGroupOffset = new HashMap<>(); + private Map> consumerGroupOffset = new ConcurrentHashMap<>(); public Map consumerGroupsOffsets(String groupId) throws ExecutionException, InterruptedException { if (!this.consumerGroupOffset.containsKey(groupId)) { @@ -215,7 +216,11 @@ public Map> describeLog return this.logDirs; } - private Map describeConfigs = new HashMap<>(); + private Map describeConfigs = new ConcurrentHashMap<>(); + + public void clearConfigCache() { + this.describeConfigs = new ConcurrentHashMap<>(); + } public Map describeConfigs(ConfigResource.Type type, List names) throws ExecutionException, InterruptedException { List list = new ArrayList<>(names); diff --git a/src/main/java/org/kafkahq/repositories/ConfigRepository.java b/src/main/java/org/kafkahq/repositories/ConfigRepository.java index 07016372a..6ca9211b9 100644 --- a/src/main/java/org/kafkahq/repositories/ConfigRepository.java +++ b/src/main/java/org/kafkahq/repositories/ConfigRepository.java @@ -63,10 +63,18 @@ public void updateTopic(String clusterId, String name, List configs) throws ExecutionException, InterruptedException { - List entries = configs + List entries = new ArrayList<>(); + + this.find(type, Collections.singletonList(name)) + .get(name) + .stream() + .filter(config -> config.getSource().name().startsWith("DYNAMIC_")) + .forEach(config -> entries.add(new ConfigEntry(config.getName(), config.getValue()))); + + configs .stream() .map(config -> new ConfigEntry(config.getName(), config.getValue())) - .collect(Collectors.toList()); + .forEach(entries::add); kafkaModule.getAdminClient(clusterId) .alterConfigs(ImmutableMap.of( @@ -75,6 +83,8 @@ private void update(String clusterId, ConfigResource.Type type, String name, Lis )) .all() .get(); + + kafkaWrapper.clearConfigCache(); } public static List updatedConfigs(Map request, List configs) { diff --git a/src/main/java/org/kafkahq/repositories/RecordRepository.java b/src/main/java/org/kafkahq/repositories/RecordRepository.java index 82150e4c8..e51ab2e05 100644 --- a/src/main/java/org/kafkahq/repositories/RecordRepository.java +++ b/src/main/java/org/kafkahq/repositories/RecordRepository.java @@ -538,7 +538,7 @@ public void setAfter(String after) { Splitter.on('_') .withKeyValueSeparator('-') .split(after) - .forEach((key, value) -> this.after.put(new Integer(key), new Long(value))); + .forEach((key, value) -> this.after.put(Integer.valueOf(key), Long.valueOf(value))); } public String pagination(Map offsets) { diff --git a/src/main/java/org/kafkahq/repositories/SchemaRegistryRepository.java b/src/main/java/org/kafkahq/repositories/SchemaRegistryRepository.java index bd78cb1b5..181758a61 100644 --- a/src/main/java/org/kafkahq/repositories/SchemaRegistryRepository.java +++ b/src/main/java/org/kafkahq/repositories/SchemaRegistryRepository.java @@ -12,6 +12,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @Singleton @@ -25,19 +28,19 @@ public SchemaRegistryRepository(KafkaModule kafkaModule) { this.kafkaModule = kafkaModule; } - public List getAll(String clusterId) throws IOException, RestClientException { + public List> getAll(String clusterId, Optional search) throws IOException, RestClientException { return this.kafkaModule .getRegistryRestClient(clusterId) .getAllSubjects() .stream() - .map(s -> { + .filter(s -> isSearchMatch(search, s)) + .map(s -> CompletableFuture.supplyAsync(() -> { try { return getLatestVersion(clusterId, s); } catch (RestClientException | IOException e) { throw new RuntimeException(e); } - - }) + })) .collect(Collectors.toList()); } @@ -56,6 +59,24 @@ public boolean exist(String clusterId, String subject) throws IOException, RestC return found; } + public Schema getById(String clusterId, Integer id) throws IOException, RestClientException, ExecutionException, InterruptedException { + for (CompletableFuture future: this.getAll(clusterId, Optional.empty())) { + Schema schema = future.get(); + + if (schema.getId().equals(id)) { + return schema; + } + + for (Schema version: this.getAllVersions(clusterId, schema.getSubject())) { + if (version.getId().equals(id)) { + return schema; + } + } + } + + return null; + } + public Schema getLatestVersion(String clusterId, String subject) throws IOException, RestClientException { io.confluent.kafka.schemaregistry.client.rest.entities.Schema latestVersion = this.kafkaModule .getRegistryRestClient(clusterId) diff --git a/src/main/java/org/kafkahq/repositories/TopicRepository.java b/src/main/java/org/kafkahq/repositories/TopicRepository.java index 4c005196e..ffd499cdf 100644 --- a/src/main/java/org/kafkahq/repositories/TopicRepository.java +++ b/src/main/java/org/kafkahq/repositories/TopicRepository.java @@ -11,7 +11,10 @@ import javax.inject.Inject; import javax.inject.Singleton; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; @Singleton public class TopicRepository extends AbstractRepository { @@ -41,7 +44,7 @@ public enum TopicListView { HIDE_STREAM, } - public List list(TopicListView view, Optional search) throws ExecutionException, InterruptedException { + public List> list(TopicListView view, Optional search) throws ExecutionException, InterruptedException { ArrayList list = new ArrayList<>(); Collection listTopics = kafkaWrapper.listTopics(); @@ -52,10 +55,18 @@ public List list(TopicListView view, Optional search) throws Exec } } - List topics = this.findByName(list); - topics.sort(Comparator.comparing(Topic::getName)); - - return topics; + list.sort(Comparator.comparing(s -> s)); + + return list.stream() + .map(s -> CompletableFuture.supplyAsync(() -> { + try { + return this.findByName(s); + } + catch(ExecutionException | InterruptedException ex) { + throw new CompletionException(ex); + } + })) + .collect(Collectors.toList()); } public boolean isListViewMatch(TopicListView view, String value) { diff --git a/src/main/java/org/kafkahq/utils/CompletablePaged.java b/src/main/java/org/kafkahq/utils/CompletablePaged.java new file mode 100644 index 000000000..34f194552 --- /dev/null +++ b/src/main/java/org/kafkahq/utils/CompletablePaged.java @@ -0,0 +1,70 @@ +package org.kafkahq.utils; + +import lombok.AllArgsConstructor; +import org.codehaus.httpcache4j.uri.URIBuilder; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +@AllArgsConstructor +public class CompletablePaged { + private List> list; + private Integer pageSize; + private URIBuilder uri; + private Integer currentPage; + + public int size() { + return this.list.size(); + } + + public URIBuilder before() { + if (currentPage - 1 > 0) { + return uri.addParameter("page", String.valueOf(currentPage - 1)); + } else { + return URIBuilder.empty(); + } + } + + public URIBuilder after() { + if (currentPage + 1 <= this.pageCount()) { + return uri.addParameter("page", String.valueOf(currentPage + 1)); + } else { + return URIBuilder.empty(); + } + } + + public int pageCount() { + return (this.list.size() / this.pageSize) + (this.list.size() % this.pageSize == 0 ? 0 : 1); + } + + public List complete() throws ExecutionException, InterruptedException { + int start; + int end; + + if (this.currentPage == 1) { + start = 0; + end = Math.min(this.pageSize, list.size()); + } else if (this.currentPage == this.pageCount()) { + start = (this.currentPage - 1) * this.pageSize; + end = this.list.size(); + } else { + start = (this.currentPage - 1) * this.pageSize; + end = (this.currentPage * this.pageSize); + } + + List> futuresList = list.subList(start, end); + + CompletableFuture allFuturesResult = CompletableFuture.allOf( + futuresList.toArray(new CompletableFuture[0]) + ); + + return allFuturesResult.thenApply(s -> + futuresList.stream(). + map(CompletableFuture::join). + collect(Collectors.toList()) + ) + .get(); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 35aba452c..305682477 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -31,6 +31,28 @@ micronaut: - pattern: "${kafkahq.server.base-path:}/static/**" access: "isAnonymous()" +endpoints: + all: + path: "${kafkahq.server.base-path:}" + health: + enabled: true + sensitive: false + details-visible: anonymous + info: + enabled: true + sensitive: false + metrics: + enabled: true + sensitive: false + export: + prometheus: + enabled: true + step: PT1M + descriptions: true + prometheus: + enabled: true + sensitive: false + jackson: module-scan: false @@ -52,6 +74,7 @@ kafkahq: topic: default-view: HIDE_INTERNAL + page-size: 25 internal-regexps: - "^_.*$" - "^.*_schemas$" @@ -63,6 +86,9 @@ kafkahq: - "^.*-repartition$" - "^.*-rekey$" + schema: + page-size: 25 + topic-data: sort: OLDEST size: 50 diff --git a/src/main/resources/views/blocks/navbar-search.ftl b/src/main/resources/views/blocks/navbar-search.ftl index 3d5eb6e47..fa941c096 100644 --- a/src/main/resources/views/blocks/navbar-search.ftl +++ b/src/main/resources/views/blocks/navbar-search.ftl @@ -4,26 +4,44 @@ <#-- @ftlvariable name="topicListView" type="org.kafkahq.repositories.TopicRepository.TopicListView" --> \ No newline at end of file diff --git a/src/main/resources/views/blocks/pagination.ftl b/src/main/resources/views/blocks/pagination.ftl new file mode 100644 index 000000000..63ccdb8d5 --- /dev/null +++ b/src/main/resources/views/blocks/pagination.ftl @@ -0,0 +1,27 @@ +<#ftl output_format="HTML"> + +<#-- @ftlvariable name="pagination" type="java.util.Map" --> + +<#assign size = pagination["size"] > +<#assign after = pagination["after"] > + + diff --git a/src/main/resources/views/blocks/topic/data.ftl b/src/main/resources/views/blocks/topic/data.ftl index b385a60ee..e8dfe472a 100644 --- a/src/main/resources/views/blocks/topic/data.ftl +++ b/src/main/resources/views/blocks/topic/data.ftl @@ -18,7 +18,7 @@