Skip to content

Commit

Permalink
BE: Serde: Fix HTTP 500 on protobuf Any type (#696)
Browse files Browse the repository at this point in the history
Co-authored-by: Roman Zabaluev <[email protected]>
  • Loading branch information
DimaVilda and Haarolean authored Dec 20, 2024
1 parent 4bb3632 commit d5c976e
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.protobuf.StructProto;
import com.google.protobuf.TimestampProto;
import com.google.protobuf.TypeProto;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.WrappersProto;
import com.google.protobuf.util.JsonFormat;
import com.google.type.ColorProto;
Expand Down Expand Up @@ -147,12 +148,18 @@ public boolean canSerialize(String topic, Serde.Target type) {
@Override
public Serde.Serializer serializer(String topic, Serde.Target type) {
var descriptor = descriptorFor(topic, type).orElseThrow();
TypeRegistry typeRegistry = TypeRegistry.newBuilder()
.add(descriptorPaths.keySet())
.build();

return new Serde.Serializer() {
@SneakyThrows
@Override
public byte[] serialize(String input) {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
JsonFormat.parser().merge(input, builder);
JsonFormat.parser()
.usingTypeRegistry(typeRegistry)
.merge(input, builder);
return builder.build().toByteArray();
}
};
Expand Down
14 changes: 14 additions & 0 deletions api/src/test/java/io/kafbat/ui/AbstractIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kafbat.ui.container.KafkaConnectContainer;
import io.kafbat.ui.container.KsqlDbContainer;
import io.kafbat.ui.container.SchemaRegistryContainer;
import java.io.FileNotFoundException;
import java.nio.file.Path;
import java.util.List;
import java.util.Properties;
Expand All @@ -22,6 +23,7 @@
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.util.TestSocketUtils;
import org.springframework.util.ResourceUtils;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
Expand Down Expand Up @@ -75,6 +77,18 @@ public static class Initializer
public void initialize(@NotNull ConfigurableApplicationContext context) {
System.setProperty("kafka.clusters.0.name", LOCAL);
System.setProperty("kafka.clusters.0.bootstrapServers", kafka.getBootstrapServers());

// Add ProtobufFileSerde configuration
System.setProperty("kafka.clusters.0.serde.0.name", "ProtobufFile");
System.setProperty("kafka.clusters.0.serde.0.topicValuesPattern", "masking-test-.*");
try {
System.setProperty("kafka.clusters.0.serde.0.properties.protobufFilesDir",
ResourceUtils.getFile("classpath:protobuf-serde").getAbsolutePath());
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
System.setProperty("kafka.clusters.0.serde.0.properties.protobufMessageName", "test.MessageWithAny");

// List unavailable hosts to verify failover
System.setProperty("kafka.clusters.0.schemaRegistry",
String.format("http://localhost:%1$s,http://localhost:%1$s,%2$s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,15 @@ void setUp() throws Exception {
void loadsAllProtoFiledFromTargetDirectory() throws Exception {
var protoDir = ResourceUtils.getFile("classpath:protobuf-serde/").getPath();
List<ProtoFile> files = new ProtobufFileSerde.ProtoSchemaLoader(protoDir).load();
assertThat(files).hasSize(4);
assertThat(files).hasSize(5);
assertThat(files)
.map(f -> f.getLocation().getPath())
.containsExactlyInAnyOrder(
"language/language.proto",
"sensor.proto",
"address-book.proto",
"lang-description.proto"
"lang-description.proto",
"messagewithany.proto"
);
}

Expand Down
30 changes: 30 additions & 0 deletions api/src/test/java/io/kafbat/ui/service/MessagesServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kafbat.ui.model.TopicMessageDTO;
import io.kafbat.ui.model.TopicMessageEventDTO;
import io.kafbat.ui.producer.KafkaTestProducer;
import io.kafbat.ui.serdes.builtin.ProtobufFileSerde;
import io.kafbat.ui.serdes.builtin.StringSerde;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -214,4 +215,33 @@ void execSmartFilterTestReturnsErrorOnFilterCompilationError() {
assertThat(result.getError()).containsIgnoringCase("Compilation error");
}

@Test
void sendMessageWithProtobufAnyType() {
String jsonContent = """
{
"name": "testName",
"payload": {
"@type": "type.googleapis.com/test.PayloadMessage",
"id": "123"
}
}
""";

CreateTopicMessageDTO testMessage = new CreateTopicMessageDTO()
.key(null)
.partition(0)
.keySerde(StringSerde.name())
.content(jsonContent)
.valueSerde(ProtobufFileSerde.name());

String testTopic = MASKED_TOPICS_PREFIX + UUID.randomUUID();
createTopicWithCleanup(new NewTopic(testTopic, 5, (short) 1));

StepVerifier.create(messagesService.sendMessage(cluster, testTopic, testMessage))
.expectNextMatches(metadata -> metadata.topic().equals(testTopic)
&& metadata.partition() == 0
&& metadata.offset() >= 0)
.verifyComplete();
}

}
13 changes: 13 additions & 0 deletions api/src/test/resources/protobuf-serde/messagewithany.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";
package test;

import "google/protobuf/any.proto";

message MessageWithAny {
string name = 1;
google.protobuf.Any payload = 2;
}

message PayloadMessage {
string id = 1;
}

0 comments on commit d5c976e

Please sign in to comment.