Skip to content

Commit

Permalink
- Updated from the deprecated `org.testcontainers.containers.KafkaCon…
Browse files Browse the repository at this point in the history
…tainer` to `org.testcontainers.kafka.KafkaContainer`

- Switched to use the `apache/kafka-native` docker image as required by `org.testcontainers.kafka.KafkaContainer`
   - Added custom env as recommended in testcontainers/testcontainers-java#9506
- Added `objenesis` dependency where needed to deserialize outbox messages
- Adjusted test timeout parameters
- Adjusted logging to be less verbose when running the application
  • Loading branch information
cloudcreate-dk committed Dec 18, 2024
1 parent 7c77c00 commit 40cbe9c
Show file tree
Hide file tree
Showing 11 changed files with 34 additions and 50 deletions.
22 changes: 1 addition & 21 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,11 @@ services:
- "27017:27017"
command: [--replSet, "rs0"]

# From https://developer.confluent.io/quickstart/kafka-docker/
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-kafka:latest
image: apache/kafka-native:latest
container_name: kafka_broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

# Tracing, Latency and Metric setup is inspired by https://spring.io/blog/2022/10/12/observability-with-spring-boot-3 / https://github.com/marcingrzejszczak/observability-boot-blog-post
tempo:
Expand Down
10 changes: 5 additions & 5 deletions mongodb-inbox-outbox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
<artifactId>spring-boot-starter-mongodb</artifactId>
<version>${essentials.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.objenesis</groupId>-->
<!-- <artifactId>objenesis</artifactId>-->
<!-- <version>${objenesis.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>${objenesis.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class OrderShippingProcessorIT {
static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:latest");

@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest")
.withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094");
private KafkaMessageListenerContainer<String, Object> kafkaListenerContainer;

@DynamicPropertySource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public class AccountsIT {
.withUsername("test");

@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest")
.withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094");

@DynamicPropertySource
static void setProperties(DynamicPropertyRegistry registry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class TransferMoneyProcessorIT {
.withUsername("test");

@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest")
.withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094");

@DynamicPropertySource
static void setProperties(DynamicPropertyRegistry registry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@
import org.springframework.kafka.listener.*;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.*;
import org.testcontainers.containers.*;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.*;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.*;
Expand All @@ -57,7 +55,8 @@ public class OrderShippingProcessorIT {
.withUsername("test");

@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest")
.withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094");
private KafkaMessageListenerContainer<String, Object> kafkaListenerContainer;

@DynamicPropertySource
Expand Down
10 changes: 5 additions & 5 deletions postgresql-inbox-outbox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
<artifactId>types-springdata-jpa</artifactId>
<version>${essentials.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.objenesis</groupId>-->
<!-- <artifactId>objenesis</artifactId>-->
<!-- <version>${objenesis.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>${objenesis.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
Expand Down
16 changes: 8 additions & 8 deletions postgresql-inbox-outbox/src/main/resources/logback-spring.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@

<logger name="dk.cloudcreate.essentials" level="DEBUG"/>
<logger name="dk.cloudcreate.essentials.components.foundation.transaction" level="INFO"/>
<logger name="dk.cloudcreate.essentials.components.foundation.reactive.command.DurableLocalCommandBus" level="DEBUG"/>
<logger name="dk.cloudcreate.essentials.components.foundation.reactive.command.DurableLocalCommandBus" level="INFO"/>
<logger name="dk.cloudcreate.essentials.components.distributed.fencedlock.postgresql.PostgresqlFencedLockManager" level="INFO"/>
<logger name="dk.cloudcreate.essentials.shared.interceptor.DefaultInterceptorChain" level="DEBUG"/>
<logger name="dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inboxes.DurableQueueBasedInboxes" level="DEBUG"/>
<logger name="dk.cloudcreate.essentials.components.queue.postgresql.PostgresqlDurableQueues" level="DEBUG"/>
<logger name="dk.cloudcreate.essentials.components.queue.postgresql.PostgresqlDurableQueues$1" level="DEBUG"/>
<logger name="dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer" level="DEBUG"/>
<logger name="dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer.MessageHandlingFailures" level="DEBUG"/>
<logger name="dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuePollingOptimizer.SimpleQueuePollingOptimizer" level="DEBUG"/>
<logger name="dk.cloudcreate.essentials.shared.interceptor.DefaultInterceptorChain" level="INFO"/>
<logger name="dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inboxes.DurableQueueBasedInboxes" level="INFO"/>
<logger name="dk.cloudcreate.essentials.components.queue.postgresql.PostgresqlDurableQueues" level="INFO"/>
<logger name="dk.cloudcreate.essentials.components.queue.postgresql.PostgresqlDurableQueues$1" level="INFO"/>
<logger name="dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer" level="INFO"/>
<logger name="dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer.MessageHandlingFailures" level="INFO"/>
<logger name="dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuePollingOptimizer.SimpleQueuePollingOptimizer" level="INFO"/>
<root level="INFO">
<appender-ref ref="LOKI"/>
</root>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public class AbstractIntegrationTest {
.withUsername("test");

@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest")
.withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094");
protected KafkaMessageListenerContainer<String, Object> kafkaListenerContainer;

@DynamicPropertySource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class DurableQueuesLoadIT {
.withPassword("test")
.withUsername("test");
@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest")
.withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094");


@DynamicPropertySource
Expand Down Expand Up @@ -116,7 +117,7 @@ void queue_a_large_number_of_messages() {
assertThat(nextMessages).hasSize(10);


Awaitility.waitAtMost(Duration.ofSeconds(10))
Awaitility.waitAtMost(Duration.ofSeconds(20))
.untilAsserted(() -> {
System.out.println("-----> " + Instant.now() + " messages received: " + msgHandler.messagesReceived.get());
assertThat(msgHandler.messagesReceived.get()).isGreaterThan(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void stress_test_durable_queues_and_local_eventbus() {
log.debug("########## TotalMessagesQueued for '{}': '{}'", queueName, totalMessagesQueuedFor);

var stopWatch = StopWatch.start();
Awaitility.waitAtMost(Duration.ofMinutes(60))
Awaitility.waitAtMost(Duration.ofMinutes(3))
.pollDelay(Duration.ofSeconds(5))
.pollInterval(Duration.ofSeconds(10))
.until(() -> {
Expand Down

0 comments on commit 40cbe9c

Please sign in to comment.