Skip to content

Commit

Permalink
Add a pagination thread configuration to allow parallel resolution of…
Browse files Browse the repository at this point in the history
… list page
  • Loading branch information
tchiotludo committed Nov 11, 2019
1 parent dad1fa2 commit 8fd76c1
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 70 deletions.
12 changes: 4 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,11 @@ kafkahq:
### KafkaHQ configuration
#### Pagination
* `kafkahq.pagination.page-size` number of topics per page (default : 25)
* `kafkahq.pagination.threads` number of parallel threads to resolve current page (default : 16). This setting can have a signifiant impact on performance on list page since it will fetch in parallel the Kafka API.

#### Topic List
* `kafkahq.topic.page-size` number of topics per page (default : 25)
* `kafkahq.topic.default-view` is default list view (ALL, HIDE_INTERNAL, HIDE_INTERNAL_STREAM, HIDE_STREAM)
* `kafkahq.topic.internal-regexps` is list of regexp to be considered as internal (internal topic can't be deleted or updated)
* `kafkahq.topic.stream-regexps` is list of regexp to be considered as internal stream topic
Expand All @@ -173,13 +176,6 @@ These parameters are the default values used in the topic creation page.
* `kafkahq.topic-data.poll-timeout`: The time, in milliseconds, spent waiting in poll if data is not available in the
buffer (default: 1000).

#### Consumer Groups List
* `kafkahq.consumer-groups.page-size` number of consumer groups per page (default : 25)


#### Schema List
* `kafkahq.schema.page-size` number of schemas per page (default : 25)


### Security
* `kafkahq.security.default-roles`: Roles available for all the user even unlogged user, roles available are :
Expand Down
13 changes: 4 additions & 9 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,15 @@ kafkahq:
ssl.keystore.password: password
ssl.key.password: password

pagination:
page-size: 25 # number of elements per page (default : 25)
threads: 16 # Number of parallel threads to resolve page

# Topic list display options (optional)
topic:
retention: 172800000 # default retention period when creating topic
partition: 3 # default number of partition when creating topic
replication: 3 # default number of replicas when creating topic
page-size: 25 # number of topics per page (default : 25)
default-view: HIDE_INTERNAL # default list view (ALL, HIDE_INTERNAL, HIDE_INTERNAL_STREAM, HIDE_STREAM)
internal-regexps: # list of regexp to be considered as internal (internal topic can't be deleted or updated)
- "^_.*$"
Expand All @@ -81,14 +84,6 @@ kafkahq:
size: 50 # max record per page (default: 50)
poll-timeout: 1000 # The time, in milliseconds, spent waiting in poll if data is not available in the buffer.

# Schema list display options (optional)
schema:
page-size: 25 # number of schemas per page (default : 25)

# Consumer groups list display options (optional)
consumer-groups:
page-size: 25 # number of consumer groups per page (default : 25)

# Auth & Roles (optional)
security:
default-roles: # Roles available for all the user even unlogged user
Expand Down
2 changes: 2 additions & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ services:
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
KAFKA_JMX_PORT: 9091
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
links:
- zookeeper
ports:
Expand Down
10 changes: 5 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 9 additions & 11 deletions src/main/java/org/kafkahq/controllers/GroupController.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.kafkahq.repositories.ConsumerGroupRepository;
import org.kafkahq.repositories.RecordRepository;
import org.kafkahq.utils.CompletablePaged;
import org.kafkahq.utils.CompletablePagedService;

import javax.inject.Inject;
import java.time.Instant;
Expand All @@ -35,14 +36,17 @@
public class GroupController extends AbstractController {
private ConsumerGroupRepository consumerGroupRepository;
private RecordRepository recordRepository;

@Value("${kafkahq.consumer-groups.page-size:25}")
private Integer pageSize;
private CompletablePagedService completablePagedService;

@Inject
public GroupController(ConsumerGroupRepository consumerGroupRepository, RecordRepository recordRepository) {
public GroupController(
ConsumerGroupRepository consumerGroupRepository,
RecordRepository recordRepository,
CompletablePagedService completablePagedService
) {
this.consumerGroupRepository = consumerGroupRepository;
this.recordRepository = recordRepository;
this.completablePagedService = completablePagedService;
}

@View("groupList")
Expand All @@ -51,13 +55,7 @@ public HttpResponse list(HttpRequest request, String cluster, Optional<String> s

List<CompletableFuture<ConsumerGroup>> list = this.consumerGroupRepository.list(cluster, search);
URIBuilder uri = URIBuilder.fromURI(request.getUri());

CompletablePaged<ConsumerGroup> paged = new CompletablePaged<>(
list,
this.pageSize,
uri,
page.orElse(1)
);
CompletablePaged<ConsumerGroup> paged = completablePagedService.of(list, uri, page.orElse(1));

return this.template(
request,
Expand Down
16 changes: 5 additions & 11 deletions src/main/java/org/kafkahq/controllers/SchemaController.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.micronaut.context.annotation.Value;
import io.micronaut.http.*;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
Expand All @@ -15,6 +14,7 @@
import org.kafkahq.modules.RequestHelper;
import org.kafkahq.repositories.SchemaRegistryRepository;
import org.kafkahq.utils.CompletablePaged;
import org.kafkahq.utils.CompletablePagedService;

import javax.inject.Inject;
import java.io.IOException;
Expand All @@ -29,13 +29,12 @@
@Controller("${kafkahq.server.base-path:}/{cluster}/schema")
public class SchemaController extends AbstractController {
private SchemaRegistryRepository schemaRepository;

@Value("${kafkahq.schema.page-size:25}")
private Integer pageSize;
private CompletablePagedService completablePagedService;

@Inject
public SchemaController(SchemaRegistryRepository schemaRepository) {
public SchemaController(SchemaRegistryRepository schemaRepository, CompletablePagedService completablePagedService) {
this.schemaRepository = schemaRepository;
this.completablePagedService = completablePagedService;
}

@View("schemaList")
Expand All @@ -49,12 +48,7 @@ public HttpResponse list(
List<CompletableFuture<Schema>> list = this.schemaRepository.getAll(cluster, search);

URIBuilder uri = URIBuilder.fromURI(request.getUri());
CompletablePaged<Schema> paged = new CompletablePaged<>(
list,
this.pageSize,
uri,
page.orElse(1)
);
CompletablePaged<Schema> paged = completablePagedService.of(list, uri, page.orElse(1));

return this.template(
request,
Expand Down
27 changes: 13 additions & 14 deletions src/main/java/org/kafkahq/controllers/TopicController.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.kafkahq.repositories.RecordRepository;
import org.kafkahq.repositories.TopicRepository;
import org.kafkahq.utils.CompletablePaged;
import org.kafkahq.utils.CompletablePagedService;
import org.reactivestreams.Publisher;

import javax.inject.Inject;
Expand All @@ -47,10 +48,10 @@ public class TopicController extends AbstractController {
private RecordRepository recordRepository;
private FreemarkerViewsRenderer freemarkerViewsRenderer;
private Environment environment;
private CompletablePagedService completablePagedService;

@Value("${kafkahq.topic.default-view}")
private String defaultView;
@Value("${kafkahq.topic.page-size:25}")
private Integer pageSize;
@Value("${kafkahq.topic.replication}")
private Integer replicationFactor;
@Value("${kafkahq.topic.retention}")
Expand All @@ -59,17 +60,20 @@ public class TopicController extends AbstractController {
private Integer partitionCount;

@Inject
public TopicController(TopicRepository topicRepository,
ConfigRepository configRepository,
RecordRepository recordRepository,
FreemarkerViewsRenderer freemarkerViewsRenderer,
Environment environment)
{
public TopicController(
TopicRepository topicRepository,
ConfigRepository configRepository,
RecordRepository recordRepository,
FreemarkerViewsRenderer freemarkerViewsRenderer,
Environment environment,
CompletablePagedService completablePagedService
) {
this.topicRepository = topicRepository;
this.configRepository = configRepository;
this.recordRepository = recordRepository;
this.freemarkerViewsRenderer = freemarkerViewsRenderer;
this.environment = environment;
this.completablePagedService = completablePagedService;
}

@View("topicList")
Expand All @@ -88,12 +92,7 @@ public HttpResponse list(
);

URIBuilder uri = URIBuilder.fromURI(request.getUri());
CompletablePaged<Topic> paged = new CompletablePaged<>(
list,
this.pageSize,
uri,
page.orElse(1)
);
CompletablePaged<Topic> paged = completablePagedService.of(list, uri, page.orElse(1));

return this.template(
request,
Expand Down
11 changes: 7 additions & 4 deletions src/main/java/org/kafkahq/utils/CompletablePaged.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
import org.codehaus.httpcache4j.uri.URIBuilder;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.*;
import java.util.stream.Collectors;

@AllArgsConstructor
public class CompletablePaged <T> {
private List<CompletableFuture<T>> list;
private Integer pageSize;
private ExecutorService executorService;
private URIBuilder uri;
private Integer currentPage;

Expand All @@ -20,6 +20,7 @@ public int size() {
}

public URIBuilder before() {

if (currentPage - 1 > 0) {
return uri.addParameter("page", String.valueOf(currentPage - 1));
} else {
Expand Down Expand Up @@ -67,11 +68,13 @@ public List<T> complete() throws ExecutionException, InterruptedException {
futuresList.toArray(new CompletableFuture[0])
);

return allFuturesResult.thenApply(s ->
return allFuturesResult.thenApplyAsync(s ->
futuresList.stream().
map(CompletableFuture::join).
collect(Collectors.toList())
collect(Collectors.toList()),
this.executorService
)
.get();

}
}
30 changes: 30 additions & 0 deletions src/main/java/org/kafkahq/utils/CompletablePagedService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.kafkahq.utils;

import io.micronaut.context.annotation.Value;
import org.codehaus.httpcache4j.uri.URIBuilder;

import javax.inject.Singleton;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Singleton
public class CompletablePagedService {
private Integer pageSize;
private Integer threads;
private ExecutorService executorService;

public CompletablePagedService(
@Value("${kafkahq.pagination.page-size}") Integer pageSize,
@Value("${kafkahq.pagination.threads}") Integer threads
) {
this.pageSize = pageSize;
this.threads = threads;
executorService = Executors.newFixedThreadPool(this.threads);
}

public <T> CompletablePaged<T> of(List<CompletableFuture<T>> list, URIBuilder uri, Integer currentPage) {
return new CompletablePaged<>(list, this.pageSize, executorService, uri, currentPage);
}
}
11 changes: 4 additions & 7 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,12 @@ kafkahq:
enable.auto.commit: "false"
default.api.timeout.ms: 15000

pagination:
page-size: 25
threads: 16

topic:
default-view: HIDE_INTERNAL
page-size: 25
replication: 1
retention: 86400000
partition: 1
Expand All @@ -97,12 +100,6 @@ kafkahq:
- "^.*-repartition$"
- "^.*-rekey$"

schema:
page-size: 25

consumer-groups:
page-size: 25

topic-data:
sort: OLDEST
size: 50
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/views/topicList.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
</span>
</td>
<td>
<#if topic.getLogDirSize()?? || topic.getLogDirSize().isEmpty() >
<#if topic.getLogDirSize().isEmpty() >
n/a
<#else>
${functions.filesize(topic.getLogDirSize().get())}
Expand Down

0 comments on commit 8fd76c1

Please sign in to comment.