Skip to content

Commit

Permalink
Merge pull request #31 from cloudcreate-dk/release_0_40_19
Browse files Browse the repository at this point in the history
Release 0.40.19
  • Loading branch information
cloudcreate-dk authored Dec 18, 2024
2 parents 4d095a4 + 40cbe9c commit 449b99a
Show file tree
Hide file tree
Showing 36 changed files with 1,107 additions and 113 deletions.
22 changes: 1 addition & 21 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,11 @@ services:
- "27017:27017"
command: [--replSet, "rs0"]

# From https://developer.confluent.io/quickstart/kafka-docker/
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-kafka:latest
image: apache/kafka-native:latest
container_name: kafka_broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

# Tracing, Latency and Metric setup is inspired by https://spring.io/blog/2022/10/12/observability-with-spring-boot-3 / https://github.com/marcingrzejszczak/observability-boot-blog-post
tempo:
Expand Down
10 changes: 5 additions & 5 deletions mongodb-inbox-outbox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
<artifactId>spring-boot-starter-mongodb</artifactId>
<version>${essentials.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.objenesis</groupId>-->
<!-- <artifactId>objenesis</artifactId>-->
<!-- <version>${objenesis.version}</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>${objenesis.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ essentials.immutable-jackson-module-enabled=true
essentials.reactive.event-bus-backpressure-buffer-size=1024
essentials.reactive.overflow-max-retries=20
essentials.reactive.queued-task-cap-factor=1.5
#essentials.reactive.parallel-threads=4
#essentials.reactive.event-bus-parallel-threads=4
#essentials.reactive.command-bus-parallel-send-and-dont-wait-consumers=4

essentials.durable-queues.shared-queue-collection-name=durable_queues
essentials.durable-queues.message-handling-timeout=5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public class OrderShippingProcessorIT {
static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:latest");

@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
static org.testcontainers.kafka.KafkaContainer kafkaContainer = new org.testcontainers.kafka.KafkaContainer("apache/kafka-native:latest")
.withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094");
private KafkaMessageListenerContainer<String, Object> kafkaListenerContainer;

@DynamicPropertySource
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<skipDependencyCheck>false</skipDependencyCheck>

<!--Essentials versions-->
<essentials.version>0.40.18</essentials.version>
<essentials.version>0.40.19</essentials.version>

<spring-boot.version>3.2.12</spring-boot.version>
<spring-framework-bom.version>6.1.15</spring-framework-bom.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task;

import dk.cloudcreate.essentials.components.eventsourced.aggregates.stateful.StatefulAggregateRepository;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.ConfigurableEventStore;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateType;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.persistence.table_per_aggregate_type.SeparateTablePerAggregateEventStreamConfiguration;
import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands.CreateTask;
import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.Task;
import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.TaskId;
import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events.TaskEvent;
import lombok.NonNull;
import org.springframework.stereotype.Component;

import java.util.Optional;

import static dk.cloudcreate.essentials.components.eventsourced.aggregates.stateful.StatefulAggregateInstanceFactory.reflectionBasedAggregateRootFactory;

@Component
public class TaskEventStoreRepository {

public static final AggregateType AGGREGATE_TYPE = AggregateType.of("Task");
private final ConfigurableEventStore<SeparateTablePerAggregateEventStreamConfiguration> eventStore;
private final StatefulAggregateRepository<TaskId, TaskEvent, Task> repository;

public TaskEventStoreRepository(@NonNull ConfigurableEventStore<SeparateTablePerAggregateEventStreamConfiguration> eventStore) {
this.eventStore = eventStore;
repository = StatefulAggregateRepository.from(eventStore,
AGGREGATE_TYPE,
reflectionBasedAggregateRootFactory(),
Task.class);
}

public Optional<Task> findTask(@NonNull TaskId taskId) {
return repository.tryLoad(taskId);
}

public Task getTask(@NonNull TaskId taskId) {
return repository.load(taskId);
}

public Task createTask(@NonNull TaskId taskId, CreateTask cmd) {
var task = new Task(taskId, cmd);
return repository.save(task);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task;

import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.eventstream.AggregateType;
import dk.cloudcreate.essentials.components.eventsourced.eventstore.postgresql.processor.*;
import dk.cloudcreate.essentials.components.foundation.messaging.MessageHandler;
import dk.cloudcreate.essentials.reactive.command.CmdHandler;
import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands.*;
import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.Task;
import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events.TaskCreated;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.List;

import static java.util.Objects.nonNull;

@Slf4j
@Service
public class TaskProcessor extends InTransactionEventProcessor {

private final TaskEventStoreRepository taskEventStoreRepository;

protected TaskProcessor(TaskEventStoreRepository taskEventStoreRepository,
EventProcessorDependencies eventProcessorDependencies) {
super(eventProcessorDependencies, true);
this.taskEventStoreRepository = taskEventStoreRepository;
}

public TaskEventStoreRepository getTaskEventStoreRepository() {
return taskEventStoreRepository;
}

@Override
public String getProcessorName() {
return "TaskProcessor";
}

@Override
protected List<AggregateType> reactsToEventsRelatedToAggregateTypes() {
return List.of(TaskEventStoreRepository.AGGREGATE_TYPE);
}

@CmdHandler
public void handle(CreateTask cmd) {
log.info("Creating task with command '{}'", cmd);
taskEventStoreRepository.createTask(cmd.taskId(), cmd);

}

@CmdHandler
public void handle(AddComment cmd) {
log.info("Adding comment '{}'", cmd);
Task task = taskEventStoreRepository.findTask(cmd.taskId()).orElseThrow();
task.addComment(cmd);
}

@MessageHandler
void handle(TaskCreated event) {
if (nonNull(event.getComment())) {
log.info("Task '{}' contains comment adding comment command", event);
commandBus.send(new AddComment(event.getTaskId(), event.getComment(), event.getCreatedAt()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands;

import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.TaskId;

import java.time.LocalDateTime;

public record AddComment(TaskId taskId, String content, LocalDateTime createdAt) implements TaskCommand {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands;

import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.TaskId;

public record CreateTask(TaskId taskId, String comment) implements TaskCommand {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands;

public interface TaskCommand {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain;

import java.time.LocalDateTime;

public record Comment(TaskId taskId, String content, LocalDateTime createdAt) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain;

import dk.cloudcreate.essentials.components.eventsourced.aggregates.EventHandler;
import dk.cloudcreate.essentials.components.eventsourced.aggregates.stateful.modern.AggregateRoot;
import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands.AddComment;
import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.commands.CreateTask;
import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events.CommentAdded;
import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events.TaskCreated;
import dk.cloudcreate.essentials.spring.examples.postgresql.cqrs.task.domain.events.TaskEvent;
import lombok.ToString;

import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.Set;

@ToString
public class Task extends AggregateRoot<TaskId, TaskEvent, Task> {

private Set<Comment> comments;

public Task(TaskId aggregateId) {
super(aggregateId);
}

public Task(TaskId aggregateId, CreateTask cmd) {
super(aggregateId);
apply(new TaskCreated(aggregateId,
cmd.comment(),
LocalDateTime.now()
));
}

public void addComment(AddComment cmd) {
Comment comment = new Comment(cmd.taskId(), cmd.content(), cmd.createdAt());
if (!comments.contains(comment)) {
apply(new CommentAdded(cmd.taskId(), cmd.content(), cmd.createdAt()));
}
}

public Set<Comment> getComments() {
return comments;
}

@EventHandler
private void on(TaskCreated event) {
comments = new HashSet<>();
}

@EventHandler
private void on(CommentAdded event) {
comments.add(new Comment(event.getTaskId(), event.getContent(), event.getCreatedAt()));
}
}
Loading

0 comments on commit 449b99a

Please sign in to comment.