KAFKA-15045: (KIP-924 pt. 25) Rename old internal StickyTaskAssignor to LegacyStickyTaskAssignor (#16322)

To avoid confusion in 3.8/until we fully remove all the old task assignors and internal config, we should rename the old internal assignor classes like the StickyTaskAssignor so that they won't be mixed up with the new version of the assignor (which is also named StickyTaskAssignor)

Reviewers: Bruno Cadonna <cadonna@apache.org>, Josep Prat <josep.prat@aiven.io>
This commit is contained in:
A. Sophie Blee-Goldman 2024-06-13 11:27:50 -07:00 committed by GitHub
parent f380cd1b64
commit 4333af5c9f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 37 additions and 32 deletions

View File

@ -59,10 +59,10 @@ import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskTopicPartition;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackUtils;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.DefaultTaskInfo;
@ -861,8 +861,8 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
private LegacyTaskAssignor createTaskAssignor(final boolean lagComputationSuccessful) {
final LegacyTaskAssignor taskAssignor = legacyTaskAssignorSupplier.get();
if (taskAssignor instanceof StickyTaskAssignor) {
// special case: to preserve pre-existing behavior, we invoke the StickyTaskAssignor
if (taskAssignor instanceof LegacyStickyTaskAssignor) {
// special case: to preserve pre-existing behavior, we invoke the LegacyStickyTaskAssignor
// whether or not lag computation failed.
return taskAssignor;
} else if (lagComputationSuccessful) {

View File

@ -32,10 +32,10 @@ import org.apache.kafka.streams.processor.assignment.ProcessId;
* 2. always return true, indicating that a follow-up rebalance is needed
*/
public class FallbackPriorTaskAssignor implements LegacyTaskAssignor {
private final StickyTaskAssignor delegate;
private final LegacyStickyTaskAssignor delegate;
public FallbackPriorTaskAssignor() {
delegate = new StickyTaskAssignor(true);
delegate = new LegacyStickyTaskAssignor(true);
}
@Override

View File

@ -40,9 +40,14 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
public class StickyTaskAssignor implements LegacyTaskAssignor {
// Note: as of 3.8, this class has been renamed from StickyTaskAssignor to LegacyStickyTaskAssignor,
// and a new StickyTaskAssignor implementation was added that implements the new TaskAssignor interface.
// If you were previously plugging in the old StickyTaskAssignor via the internal.task.assignor.class config,
// you should migrate to the new TaskAssignor interface by removing the internal config and instead
// passing in the new StickyTaskAssignor class to the new public task.assignor.class config
public class LegacyStickyTaskAssignor implements LegacyTaskAssignor {
private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class);
private static final Logger log = LoggerFactory.getLogger(LegacyStickyTaskAssignor.class);
// For stateful tasks, by default we want to maintain stickiness. So we have higher non_overlap_cost
private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
@ -59,11 +64,11 @@ public class StickyTaskAssignor implements LegacyTaskAssignor {
private final boolean mustPreserveActiveTaskAssignment;
public StickyTaskAssignor() {
public LegacyStickyTaskAssignor() {
this(false);
}
StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
LegacyStickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) {
this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment;
}

View File

@ -1460,8 +1460,8 @@ public class StreamsConfigTest {
@Test
public void shouldReturnTaskAssignorClass() {
props.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, "StickyTaskAssignor");
assertEquals("StickyTaskAssignor", new StreamsConfig(props).getString(TASK_ASSIGNOR_CLASS_CONFIG));
props.put(StreamsConfig.TASK_ASSIGNOR_CLASS_CONFIG, "LegacyStickyTaskAssignor");
assertEquals("LegacyStickyTaskAssignor", new StreamsConfig(props).getString(TASK_ASSIGNOR_CLASS_CONFIG));
}
@Test

View File

@ -34,8 +34,8 @@ import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockApiProcessorSupplier;
@ -102,26 +102,26 @@ public class StreamsAssignmentScaleTest {
completeLargeAssignment(1_000, 10, 1000, 1, HighAvailabilityTaskAssignor.class);
}
/* StickyTaskAssignor tests */
/* LegacyStickyTaskAssignor tests */
@Test(timeout = 300 * 1000)
public void testStickyTaskAssignorLargePartitionCount() {
completeLargeAssignment(2_000, 2, 1, 1, StickyTaskAssignor.class);
completeLargeAssignment(2_000, 2, 1, 1, LegacyStickyTaskAssignor.class);
}
@Test(timeout = 300 * 1000)
public void testStickyTaskAssignorLargeNumConsumers() {
completeLargeAssignment(1_000, 1_000, 1, 1, StickyTaskAssignor.class);
completeLargeAssignment(1_000, 1_000, 1, 1, LegacyStickyTaskAssignor.class);
}
@Test(timeout = 300 * 1000)
public void testStickyTaskAssignorManyStandbys() {
completeLargeAssignment(1_000, 100, 1, 20, StickyTaskAssignor.class);
completeLargeAssignment(1_000, 100, 1, 20, LegacyStickyTaskAssignor.class);
}
@Test(timeout = 300 * 1000)
public void testStickyTaskAssignorManyThreadsPerClient() {
completeLargeAssignment(1_000, 10, 1000, 1, StickyTaskAssignor.class);
completeLargeAssignment(1_000, 10, 1000, 1, LegacyStickyTaskAssignor.class);
}
/* FallbackPriorTaskAssignor tests */

View File

@ -67,8 +67,8 @@ import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor;
import org.apache.kafka.streams.state.HostInfo;
@ -344,8 +344,8 @@ public class StreamsPartitionAssignorTest {
return asList(
new Object[]{HighAvailabilityTaskAssignor.class, true, null},
new Object[]{HighAvailabilityTaskAssignor.class, false, null},
new Object[]{StickyTaskAssignor.class, true, null},
new Object[]{StickyTaskAssignor.class, false, null},
new Object[]{LegacyStickyTaskAssignor.class, true, null},
new Object[]{LegacyStickyTaskAssignor.class, false, null},
new Object[]{FallbackPriorTaskAssignor.class, true, null},
new Object[]{FallbackPriorTaskAssignor.class, false, null},
new Object[]{null, false, org.apache.kafka.streams.processor.assignment.assignors.StickyTaskAssignor.class},

View File

@ -17,8 +17,8 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest;
import org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignorTest;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetricsTest;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@ -35,7 +35,7 @@ import org.junit.runners.Suite;
GlobalStateTaskTest.class,
TaskManagerTest.class,
TaskMetricsTest.class,
StickyTaskAssignorTest.class,
LegacyStickyTaskAssignorTest.class,
StreamsPartitionAssignorTest.class,
StandbyTaskCreationIntegrationTest.class,
})

View File

@ -104,7 +104,7 @@ import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.spy;
@RunWith(Parameterized.class)
public class StickyTaskAssignorTest {
public class LegacyStickyTaskAssignorTest {
private final List<Integer> expectedTopicGroupIds = asList(1, 2);
private final Time time = new MockTime();
@ -807,7 +807,7 @@ public class StickyTaskAssignorTest {
time
);
final boolean probingRebalanceNeeded = new StickyTaskAssignor(true).assign(
final boolean probingRebalanceNeeded = new LegacyStickyTaskAssignor(true).assign(
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),
@ -857,7 +857,7 @@ public class StickyTaskAssignorTest {
time
);
final boolean probingRebalanceNeeded = new StickyTaskAssignor().assign(
final boolean probingRebalanceNeeded = new LegacyStickyTaskAssignor().assign(
clients,
new HashSet<>(taskIds),
new HashSet<>(statefulTaskIds),
@ -937,7 +937,7 @@ public class StickyTaskAssignorTest {
tpSize, partitionSize, maxCapacity, false, statefulTasks);
final boolean probing = new StickyTaskAssignor().assign(
final boolean probing = new LegacyStickyTaskAssignor().assign(
clientStateMap,
taskIds,
statefulTasks,
@ -1005,7 +1005,7 @@ public class StickyTaskAssignorTest {
final SortedMap<ProcessId, ClientState> clientStateMap = getRandomClientState(clientSize,
tpSize, partitionSize, maxCapacity, false, statefulTasks);
new StickyTaskAssignor().assign(
new LegacyStickyTaskAssignor().assign(
clientStateMap,
taskIds,
statefulTasks,
@ -1047,7 +1047,7 @@ public class StickyTaskAssignorTest {
time
));
new StickyTaskAssignor().assign(
new LegacyStickyTaskAssignor().assign(
clientStateMapCopy,
taskIds,
statefulTasks,
@ -1085,7 +1085,7 @@ public class StickyTaskAssignorTest {
private boolean assign(final AssignmentConfigs configs, final RackAwareTaskAssignor rackAwareTaskAssignor, final TaskId... tasks) {
final List<TaskId> taskIds = asList(tasks);
Collections.shuffle(taskIds);
return new StickyTaskAssignor().assign(
return new LegacyStickyTaskAssignor().assign(
clients,
new HashSet<>(taskIds),
new HashSet<>(taskIds),

View File

@ -157,7 +157,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
configs = self.get_configs(
extra_configs=",application.id=shutdown_with_broker_down" +
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
)
processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)
@ -236,7 +236,7 @@ class StreamsBrokerDownResilience(BaseStreamsTest):
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
configs = self.get_configs(
extra_configs=",application.id=failover_with_broker_down" +
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor"
)
processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs)

View File

@ -51,7 +51,7 @@ class StreamsStandbyTask(BaseStreamsTest):
def test_standby_tasks_rebalance(self, metadata_quorum, use_new_coordinator=False):
# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor
configs = self.get_configs(
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor" % (
",sourceTopic=%s,sinkTopic1=%s,sinkTopic2=%s,internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.LegacyStickyTaskAssignor" % (
self.streams_source_topic,
self.streams_sink_topic_1,
self.streams_sink_topic_2