mirror of https://github.com/apache/kafka.git
MINOR: Rename NoOpShareStatePersister for consistency (#18933)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
63229a768c
commit
6c14f64245
|
@ -49,7 +49,7 @@ import org.apache.kafka.server.config.ConfigType
|
||||||
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
|
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
|
||||||
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
|
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
|
||||||
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
|
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
|
||||||
import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpShareStatePersister, Persister, PersisterStateManager}
|
import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpStatePersister, Persister, PersisterStateManager}
|
||||||
import org.apache.kafka.server.share.session.ShareSessionCache
|
import org.apache.kafka.server.share.session.ShareSessionCache
|
||||||
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
|
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
|
||||||
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
|
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
|
||||||
|
@ -707,18 +707,17 @@ class BrokerServer(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
} else if (klass.getName.equals(classOf[NoOpShareStatePersister].getName)) {
|
} else if (klass.getName.equals(classOf[NoOpStatePersister].getName)) {
|
||||||
info("Using no op persister")
|
info("Using no-op persister")
|
||||||
new NoOpShareStatePersister()
|
new NoOpStatePersister()
|
||||||
} else {
|
} else {
|
||||||
error("Unknown persister specified. Persister is only factory pluggable!")
|
error("Unknown persister specified. Persister is only factory-pluggable!")
|
||||||
throw new IllegalArgumentException("Unknown persiser specified " + config.shareGroupConfig.shareGroupPersisterClassName)
|
throw new IllegalArgumentException("Unknown persister specified " + config.shareGroupConfig.shareGroupPersisterClassName)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// in case share coordinator not enabled or
|
// in case share coordinator not enabled or persister class name deliberately empty (key=)
|
||||||
// persister class name deliberately empty (key=)
|
info("Using no-op persister")
|
||||||
info("Using no op persister")
|
new NoOpStatePersister()
|
||||||
new NoOpShareStatePersister()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -67,7 +67,7 @@ import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
|
||||||
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
|
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
|
||||||
import org.apache.kafka.server.share.fetch.ShareFetch;
|
import org.apache.kafka.server.share.fetch.ShareFetch;
|
||||||
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
|
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
|
||||||
import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
|
import org.apache.kafka.server.share.persister.NoOpStatePersister;
|
||||||
import org.apache.kafka.server.share.persister.Persister;
|
import org.apache.kafka.server.share.persister.Persister;
|
||||||
import org.apache.kafka.server.share.session.ShareSession;
|
import org.apache.kafka.server.share.session.ShareSession;
|
||||||
import org.apache.kafka.server.share.session.ShareSessionCache;
|
import org.apache.kafka.server.share.session.ShareSessionCache;
|
||||||
|
@ -3126,7 +3126,7 @@ public class SharePartitionManagerTest {
|
||||||
private Time time = new MockTime();
|
private Time time = new MockTime();
|
||||||
private ShareSessionCache cache = new ShareSessionCache(10, 1000);
|
private ShareSessionCache cache = new ShareSessionCache(10, 1000);
|
||||||
private Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
|
private Map<SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
|
||||||
private Persister persister = new NoOpShareStatePersister();
|
private Persister persister = new NoOpStatePersister();
|
||||||
private Timer timer = new MockTimer();
|
private Timer timer = new MockTimer();
|
||||||
private ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
|
private ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
|
||||||
private BrokerTopicStats brokerTopicStats;
|
private BrokerTopicStats brokerTopicStats;
|
||||||
|
|
|
@ -48,7 +48,7 @@ import org.apache.kafka.coordinator.group.GroupConfigManager;
|
||||||
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
|
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
|
||||||
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
|
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
|
||||||
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
|
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
|
||||||
import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
|
import org.apache.kafka.server.share.persister.NoOpStatePersister;
|
||||||
import org.apache.kafka.server.share.persister.PartitionFactory;
|
import org.apache.kafka.server.share.persister.PartitionFactory;
|
||||||
import org.apache.kafka.server.share.persister.Persister;
|
import org.apache.kafka.server.share.persister.Persister;
|
||||||
import org.apache.kafka.server.share.persister.PersisterStateBatch;
|
import org.apache.kafka.server.share.persister.PersisterStateBatch;
|
||||||
|
@ -812,7 +812,7 @@ public class SharePartitionTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMaybeInitializeWithNoOpShareStatePersister() {
|
public void testMaybeInitializeWithNoOpStatePersister() {
|
||||||
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||||
|
|
||||||
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
|
FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
|
||||||
|
@ -5887,7 +5887,7 @@ public class SharePartitionTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteShareGroupStateWithNoOpShareStatePersister() {
|
public void testWriteShareGroupStateWithNoOpStatePersister() {
|
||||||
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
|
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
|
||||||
List<PersisterStateBatch> stateBatches = Arrays.asList(
|
List<PersisterStateBatch> stateBatches = Arrays.asList(
|
||||||
new PersisterStateBatch(5L, 10L, RecordState.AVAILABLE.id, (short) 2),
|
new PersisterStateBatch(5L, 10L, RecordState.AVAILABLE.id, (short) 2),
|
||||||
|
@ -6958,7 +6958,7 @@ public class SharePartitionTest {
|
||||||
private int maxDeliveryCount = MAX_DELIVERY_COUNT;
|
private int maxDeliveryCount = MAX_DELIVERY_COUNT;
|
||||||
private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES;
|
private int maxInflightMessages = MAX_IN_FLIGHT_MESSAGES;
|
||||||
|
|
||||||
private Persister persister = new NoOpShareStatePersister();
|
private Persister persister = new NoOpStatePersister();
|
||||||
private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
private ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
|
||||||
private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
private GroupConfigManager groupConfigManager = Mockito.mock(GroupConfigManager.class);
|
||||||
private SharePartitionState state = SharePartitionState.EMPTY;
|
private SharePartitionState state = SharePartitionState.EMPTY;
|
||||||
|
|
|
@ -81,7 +81,7 @@ import org.apache.kafka.image.MetadataImage;
|
||||||
import org.apache.kafka.image.TopicsImage;
|
import org.apache.kafka.image.TopicsImage;
|
||||||
import org.apache.kafka.server.record.BrokerCompressionType;
|
import org.apache.kafka.server.record.BrokerCompressionType;
|
||||||
import org.apache.kafka.server.share.persister.DefaultStatePersister;
|
import org.apache.kafka.server.share.persister.DefaultStatePersister;
|
||||||
import org.apache.kafka.server.share.persister.NoOpShareStatePersister;
|
import org.apache.kafka.server.share.persister.NoOpStatePersister;
|
||||||
import org.apache.kafka.server.share.persister.PartitionFactory;
|
import org.apache.kafka.server.share.persister.PartitionFactory;
|
||||||
import org.apache.kafka.server.share.persister.Persister;
|
import org.apache.kafka.server.share.persister.Persister;
|
||||||
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
|
import org.apache.kafka.server.share.persister.ReadShareGroupStateSummaryParameters;
|
||||||
|
@ -2480,7 +2480,7 @@ public class GroupCoordinatorServiceTest {
|
||||||
private GroupCoordinatorConfig config;
|
private GroupCoordinatorConfig config;
|
||||||
private CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime;
|
private CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime;
|
||||||
private GroupCoordinatorMetrics metrics = new GroupCoordinatorMetrics();
|
private GroupCoordinatorMetrics metrics = new GroupCoordinatorMetrics();
|
||||||
private Persister persister = new NoOpShareStatePersister();
|
private Persister persister = new NoOpStatePersister();
|
||||||
private MetadataImage metadataImage = null;
|
private MetadataImage metadataImage = null;
|
||||||
|
|
||||||
GroupCoordinatorService build() {
|
GroupCoordinatorService build() {
|
||||||
|
|
|
@ -26,9 +26,9 @@ import java.util.stream.Collectors;
|
||||||
/**
|
/**
|
||||||
* A no-op singleton implementation of {@link Persister} interface.
|
* A no-op singleton implementation of {@link Persister} interface.
|
||||||
*/
|
*/
|
||||||
public class NoOpShareStatePersister implements Persister {
|
public class NoOpStatePersister implements Persister {
|
||||||
|
|
||||||
public NoOpShareStatePersister() {
|
public NoOpStatePersister() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
Loading…
Reference in New Issue