diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java index 51c921603..723474cae 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java @@ -15,6 +15,7 @@ import com.google.protobuf.StructProto; import com.google.protobuf.TimestampProto; import com.google.protobuf.TypeProto; +import com.google.protobuf.TypeRegistry; import com.google.protobuf.WrappersProto; import com.google.protobuf.util.JsonFormat; import com.google.type.ColorProto; @@ -147,12 +148,18 @@ public boolean canSerialize(String topic, Serde.Target type) { @Override public Serde.Serializer serializer(String topic, Serde.Target type) { var descriptor = descriptorFor(topic, type).orElseThrow(); + TypeRegistry typeRegistry = TypeRegistry.newBuilder() + .add(descriptorPaths.keySet()) + .build(); + return new Serde.Serializer() { @SneakyThrows @Override public byte[] serialize(String input) { DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor); - JsonFormat.parser().merge(input, builder); + JsonFormat.parser() + .usingTypeRegistry(typeRegistry) + .merge(input, builder); return builder.build().toByteArray(); } }; diff --git a/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java b/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java index 554387a1a..9722f2c19 100644 --- a/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java +++ b/api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java @@ -5,6 +5,7 @@ import io.kafbat.ui.container.KafkaConnectContainer; import io.kafbat.ui.container.KsqlDbContainer; import io.kafbat.ui.container.SchemaRegistryContainer; +import java.io.FileNotFoundException; import java.nio.file.Path; import java.util.List; import java.util.Properties; @@ -22,6 +23,7 @@ import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.util.TestSocketUtils; +import org.springframework.util.ResourceUtils; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.Network; import org.testcontainers.utility.DockerImageName; @@ -75,6 +77,18 @@ public static class Initializer public void initialize(@NotNull ConfigurableApplicationContext context) { System.setProperty("kafka.clusters.0.name", LOCAL); System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers()); + + // Add ProtobufFileSerde configuration + System.setProperty("kafka.clusters.0.serde.0.name", "ProtobufFile"); + System.setProperty("kafka.clusters.0.serde.0.topicValuesPattern", "masking-test-.*"); + try { + System.setProperty("kafka.clusters.0.serde.0.properties.protobufFilesDir", + ResourceUtils.getFile("classpath:protobuf-serde").getAbsolutePath()); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + System.setProperty("kafka.clusters.0.serde.0.properties.protobufMessageName", "test.MessageWithAny"); + // List unavailable hosts to verify failover System.setProperty("kafka.clusters.0.schemaRegistry", String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s", diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerdeTest.java index 069f9ed45..becccafa6 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerdeTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerdeTest.java @@ -80,14 +80,15 @@ void setUp() throws Exception { void loadsAllProtoFiledFromTargetDirectory() throws Exception { var protoDir = ResourceUtils.getFile("classpath:protobuf-serde/").getPath(); List files = new ProtobufFileSerde.ProtoSchemaLoader(protoDir).load(); - assertThat(files).hasSize(4); + assertThat(files).hasSize(5); assertThat(files) .map(f -> f.getLocation().getPath()) .containsExactlyInAnyOrder( "language/language.proto", "sensor.proto", "address-book.proto", - "lang-description.proto" + "lang-description.proto", + "messagewithany.proto" ); } diff --git a/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java b/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java index 849daefd6..1fecae247 100644 --- a/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java @@ -13,6 +13,7 @@ import io.kafbat.ui.model.TopicMessageDTO; import io.kafbat.ui.model.TopicMessageEventDTO; import io.kafbat.ui.producer.KafkaTestProducer; +import io.kafbat.ui.serdes.builtin.ProtobufFileSerde; import io.kafbat.ui.serdes.builtin.StringSerde; import java.util.HashSet; import java.util.List; @@ -214,4 +215,33 @@ void execSmartFilterTestReturnsErrorOnFilterCompilationError() { assertThat(result.getError()).containsIgnoringCase("Compilation error"); } + @Test + void sendMessageWithProtobufAnyType() { + String jsonContent = """ + { + "name": "testName", + "payload": { + "@type": "type.googleapis.com/test.PayloadMessage", + "id": "123" + } + } + """; + + CreateTopicMessageDTO testMessage = new CreateTopicMessageDTO() + .key(null) + .partition(0) + .keySerde(StringSerde.name()) + .content(jsonContent) + .valueSerde(ProtobufFileSerde.name()); + + String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID(); + createTopicWithCleanup(new NewTopic(testTopic, 5, (short) 1)); + + StepVerifier.create(messagesService.sendMessage(cluster, testTopic, testMessage)) + .expectNextMatches(metadata -> metadata.topic().equals(testTopic) + && metadata.partition() == 0 + && metadata.offset() >= 0) + .verifyComplete(); + } + } diff --git a/api/src/test/resources/protobuf-serde/messagewithany.proto b/api/src/test/resources/protobuf-serde/messagewithany.proto new file mode 100644 index 000000000..5a4b0dd64 --- /dev/null +++ b/api/src/test/resources/protobuf-serde/messagewithany.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; +package test; + +import "google/protobuf/any.proto"; + +message MessageWithAny { + string name = 1; + google.protobuf.Any payload = 2; +} + +message PayloadMessage { + string id = 1; +}