forked from confluentinc/kafka-streams-examples
-
Notifications
You must be signed in to change notification settings - Fork 1
/
MixAndMatchLambdaIntegrationTest.java
178 lines (154 loc) · 7.68 KB
/
MixAndMatchLambdaIntegrationTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/*
* Copyright Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.examples.streams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import io.confluent.examples.streams.kafka.EmbeddedSingleNodeKafkaCluster;
import static org.assertj.core.api.Assertions.assertThat;
/**
* End-to-end integration test that demonstrates how to mix and match the DSL and the Processor
* API of Kafka Streams.
*
* More concretely, we show how to use the {@link KStream#transform(TransformerSupplier, String...)}
* method to include a custom {@link org.apache.kafka.streams.kstream.Transformer} (from the
* Processor API) in a topology defined via the DSL. The fictitious use case is to anonymize
* IPv4 addresses contained in the input data.
*
* Tip: Users that want to use {@link KStream#process(ProcessorSupplier, String...)} would need to
* include a custom {@link org.apache.kafka.streams.processor.Processor}. Keep in mind though that
* the return type of {@link KStream#process(ProcessorSupplier, String...)} is `void`, which means
* you cannot add further operators (such as `to()` as we do below) after having called `process()`.
* If you need to add further operators, you'd have to use `transform()` as we do in this example.
*
* Note: This example uses lambda expressions and thus works with Java 8+ only.
*/
public class MixAndMatchLambdaIntegrationTest {
@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
private static final String inputTopic = "inputTopic";
private static final String outputTopic = "outputTopic";
@BeforeClass
public static void startKafkaCluster() {
CLUSTER.createTopic(inputTopic);
CLUSTER.createTopic(outputTopic);
}
/**
* Performs rudimentary anonymization of IPv4 address in the input data.
* You should use this for demonstration purposes only.
*/
private static class AnonymizeIpAddressTransformer implements Transformer<byte[], String, KeyValue<byte[], String>> {
private static final Pattern ipv4AddressPattern =
Pattern.compile("(?<keep>[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.)(?<anonymize>[0-9]{1,3})");
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
// Not needed.
}
@Override
public KeyValue<byte[], String> transform(final byte[] recordKey, final String recordValue) {
// The record value contains the IP address in string representation.
// The original record key is ignored because we don't need it for this logic.
final String anonymizedIpAddress = anonymizeIpAddress(recordValue);
return KeyValue.pair(recordKey, anonymizedIpAddress);
}
/**
* Anonymizes the provided IPv4 address (in string representation) by replacing the fourth byte
* with "XXX". For example, "192.168.1.234" is anonymized to "192.168.1.XXX".
*
* Note: This method is for illustration purposes only. The anonymization is both lackluster
* (in terms of the achieved anonymization) and slow/inefficient (in terms of implementation).
*
* @param ipAddress The IPv4 address
* @return Anonymized IPv4 address.
*/
private String anonymizeIpAddress(final String ipAddress) {
return ipv4AddressPattern.matcher(ipAddress).replaceAll("${keep}XXX");
}
@Override
public void close() {
// Not needed.
}
}
@Test
public void shouldAnonymizeTheInput() throws Exception {
final List<String> inputValues = Arrays.asList("Hello, 1.2.3.4!", "foo 192.168.1.55 bar");
final List<String> expectedValues = Arrays.asList("HELLO, 1.2.3.XXX!", "FOO 192.168.1.XXX BAR");
//
// Step 1: Configure and start the processor topology.
//
final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "mix-and-match-lambda-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
final KStream<byte[], String> input = builder.stream(inputTopic);
final KStream<byte[], String> uppercasedAndAnonymized = input
.mapValues(v -> v.toUpperCase())
.transform(AnonymizeIpAddressTransformer::new);
uppercasedAndAnonymized.to(outputTopic);
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
//
// Step 2: Produce some input data to the input topic.
//
final Properties producerConfig = new Properties();
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(inputTopic, inputValues, producerConfig);
//
// Step 3: Verify the application's output data.
//
final Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "mix-and-match-lambda-integration-test-standard-consumer");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
final List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
consumerConfig,
outputTopic,
expectedValues.size()
);
streams.close();
assertThat(actualValues).isEqualTo(expectedValues);
}
}