Skip to content

Commit 3c378da

Browse files
authored
KAFKA-19647: Implement integration test for offline migration (#20412)
In KAFKA-19570 we implemented offline migration between groups, that is, the following integration test or system test should be possible: Test A: - Start a streams application with classic protocol, process up to a certain offset and commit the offset and shut down. - Start the same streams application with streams protocol (same app ID!). - Make sure that the offsets before the one committed in the first run are not reprocessed in the second run. Test B: - Start a streams application with streams protocol, process up to a certain offset and commit the offset and shut down. - Start the same streams application with classic protocol (same app ID!). - Make sure that the offsets before the one committed in the first run are not reprocessed in the second run. We have unit tests that make sure that non-empty groups will not be converted. This should be enough. Reviewers: Bill Bejeck <[email protected]>
1 parent 6956417 commit 3c378da

File tree

2 files changed

+222
-0
lines changed

2 files changed

+222
-0
lines changed
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.streams.integration;
19+
20+
import org.apache.kafka.clients.admin.Admin;
21+
import org.apache.kafka.clients.admin.AdminClientConfig;
22+
import org.apache.kafka.clients.consumer.ConsumerConfig;
23+
import org.apache.kafka.common.serialization.Serdes;
24+
import org.apache.kafka.common.serialization.StringDeserializer;
25+
import org.apache.kafka.common.serialization.StringSerializer;
26+
import org.apache.kafka.streams.GroupProtocol;
27+
import org.apache.kafka.streams.KafkaStreams;
28+
import org.apache.kafka.streams.KeyValue;
29+
import org.apache.kafka.streams.KeyValueTimestamp;
30+
import org.apache.kafka.streams.StreamsBuilder;
31+
import org.apache.kafka.streams.StreamsConfig;
32+
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
33+
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
34+
import org.apache.kafka.streams.kstream.Consumed;
35+
import org.apache.kafka.streams.kstream.KStream;
36+
import org.apache.kafka.streams.kstream.Produced;
37+
import org.apache.kafka.test.TestUtils;
38+
39+
import org.junit.jupiter.api.AfterAll;
40+
import org.junit.jupiter.api.AfterEach;
41+
import org.junit.jupiter.api.BeforeAll;
42+
import org.junit.jupiter.api.BeforeEach;
43+
import org.junit.jupiter.api.Tag;
44+
import org.junit.jupiter.api.Test;
45+
import org.junit.jupiter.api.TestInfo;
46+
47+
import java.io.IOException;
48+
import java.time.Duration;
49+
import java.util.List;
50+
import java.util.Map;
51+
import java.util.Properties;
52+
53+
import static java.util.Collections.singletonList;
54+
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
55+
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
56+
import static org.hamcrest.MatcherAssert.assertThat;
57+
import static org.hamcrest.core.Is.is;
58+
59+
@Tag("integration")
60+
public class RebalanceProtocolMigrationIntegrationTest {
61+
62+
public static final String INPUT_TOPIC = "migration-input";
63+
public static final String OUTPUT_TOPIC = "migration-output";
64+
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
65+
66+
private String inputTopic;
67+
private String outputTopic;
68+
private KafkaStreams kafkaStreams;
69+
private String safeTestName;
70+
71+
@BeforeAll
72+
public static void startCluster() throws IOException {
73+
CLUSTER.start();
74+
}
75+
76+
@AfterAll
77+
public static void closeCluster() {
78+
CLUSTER.stop();
79+
}
80+
81+
@BeforeEach
82+
public void createTopics(final TestInfo testInfo) throws Exception {
83+
safeTestName = safeUniqueTestName(testInfo);
84+
inputTopic = INPUT_TOPIC + safeTestName;
85+
outputTopic = OUTPUT_TOPIC + safeTestName;
86+
CLUSTER.createTopic(inputTopic);
87+
CLUSTER.createTopic(outputTopic);
88+
}
89+
90+
private Properties props() {
91+
final Properties streamsConfiguration = new Properties();
92+
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
93+
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
94+
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
95+
Serdes.String().getClass());
96+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
97+
Serdes.String().getClass());
98+
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
99+
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
100+
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 500);
101+
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
102+
return streamsConfiguration;
103+
}
104+
105+
@AfterEach
106+
public void shutdown() {
107+
if (kafkaStreams != null) {
108+
kafkaStreams.close(Duration.ofSeconds(30L));
109+
kafkaStreams.cleanUp();
110+
}
111+
}
112+
113+
114+
@Test
115+
public void shouldMigrateToAndFromStreamsRebalanceProtocol() throws Exception {
116+
final StreamsBuilder streamsBuilder = new StreamsBuilder();
117+
final KStream<String, String> input = streamsBuilder.stream(
118+
inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
119+
input.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
120+
121+
final Properties props = props();
122+
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
123+
processExactlyOneRecord(streamsBuilder, props, "1", "A");
124+
125+
// Wait for session to time out
126+
try (final Admin adminClient = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()))) {
127+
waitForEmptyConsumerGroup(adminClient, props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 1000);
128+
}
129+
130+
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
131+
processExactlyOneRecord(streamsBuilder, props, "2", "B");
132+
133+
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
134+
processExactlyOneRecord(streamsBuilder, props, "3", "C");
135+
}
136+
137+
@Test
138+
public void shouldMigrateFromAndToStreamsRebalanceProtocol() throws Exception {
139+
final StreamsBuilder streamsBuilder = new StreamsBuilder();
140+
final KStream<String, String> input = streamsBuilder.stream(
141+
inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
142+
input.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
143+
144+
final Properties props = props();
145+
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
146+
processExactlyOneRecord(streamsBuilder, props, "1", "A");
147+
148+
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
149+
processExactlyOneRecord(streamsBuilder, props, "2", "B");
150+
151+
// Wait for session to time out
152+
try (final Admin adminClient = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()))) {
153+
waitForEmptyConsumerGroup(adminClient, props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 1000);
154+
}
155+
156+
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name());
157+
processExactlyOneRecord(streamsBuilder, props, "3", "C");
158+
}
159+
160+
private void processExactlyOneRecord(
161+
final StreamsBuilder streamsBuilder,
162+
final Properties props,
163+
final String key,
164+
final String value)
165+
throws Exception {
166+
kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
167+
kafkaStreams.start();
168+
169+
final long currentTimeNew = CLUSTER.time.milliseconds();
170+
171+
processKeyValueAndVerify(
172+
key,
173+
value,
174+
currentTimeNew,
175+
List.of(
176+
new KeyValueTimestamp<>(key, value, currentTimeNew)
177+
)
178+
);
179+
180+
kafkaStreams.close();
181+
kafkaStreams = null;
182+
}
183+
184+
185+
private <K, V> void processKeyValueAndVerify(
186+
final K key,
187+
final V value,
188+
final long timestamp,
189+
final List<KeyValueTimestamp<K, V>> expected)
190+
throws Exception {
191+
192+
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
193+
inputTopic,
194+
singletonList(KeyValue.pair(key, value)),
195+
TestUtils.producerConfig(CLUSTER.bootstrapServers(),
196+
StringSerializer.class,
197+
StringSerializer.class),
198+
timestamp);
199+
200+
201+
final Properties consumerProperties = new Properties();
202+
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
203+
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName);
204+
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
205+
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
206+
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
207+
208+
209+
final List<KeyValueTimestamp<K, V>> actual =
210+
IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(
211+
consumerProperties,
212+
outputTopic,
213+
expected.size(),
214+
60 * 1000);
215+
216+
assertThat(actual, is(expected));
217+
218+
}
219+
}

streams/integration-tests/src/test/resources/log4j2.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ Configuration:
5252
- name: org.apache.kafka.streams
5353
level: INFO
5454

55+
- name: org.apache.kafka.coordinator.group
56+
level: INFO
57+
5558
- name: org.apache.kafka.clients.producer.ProducerConfig
5659
level: ERROR
5760

0 commit comments

Comments
 (0)