mirror of https://github.com/apache/kafka.git
KAFKA-17003: Implemented SharePartitionManager close() functionality (#16431)
About Implemented close() functionality of SharePartitionManager to avoid any chances of memory leak. The functionality will be utilized when the sharePartitionObject is closed, at the time when BrokerServer is killed. Testing Added unit tests to cover the new functionality added. Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>
This commit is contained in:
parent
adee6f0cc1
commit
93dd9acad0
|
|
@ -155,7 +155,7 @@ public class SharePartitionManager implements AutoCloseable {
|
|||
);
|
||||
}
|
||||
|
||||
SharePartitionManager(
|
||||
private SharePartitionManager(
|
||||
ReplicaManager replicaManager,
|
||||
Time time,
|
||||
ShareSessionCache cache,
|
||||
|
|
@ -179,6 +179,31 @@ public class SharePartitionManager implements AutoCloseable {
|
|||
this.persister = persister;
|
||||
}
|
||||
|
||||
// Visible for testing.
|
||||
SharePartitionManager(
|
||||
ReplicaManager replicaManager,
|
||||
Time time,
|
||||
ShareSessionCache cache,
|
||||
Map<SharePartitionKey, SharePartition> partitionCacheMap,
|
||||
int recordLockDurationMs,
|
||||
Timer timer,
|
||||
int maxDeliveryCount,
|
||||
int maxInFlightMessages,
|
||||
Persister persister
|
||||
) {
|
||||
this.replicaManager = replicaManager;
|
||||
this.time = time;
|
||||
this.cache = cache;
|
||||
this.partitionCacheMap = partitionCacheMap;
|
||||
this.fetchQueue = new ConcurrentLinkedQueue<>();
|
||||
this.processFetchQueueLock = new AtomicBoolean(false);
|
||||
this.recordLockDurationMs = recordLockDurationMs;
|
||||
this.timer = timer;
|
||||
this.maxDeliveryCount = maxDeliveryCount;
|
||||
this.maxInFlightMessages = maxInFlightMessages;
|
||||
this.persister = persister;
|
||||
}
|
||||
|
||||
/**
|
||||
* The fetch messages method is used to fetch messages from the log for the specified topic-partitions.
|
||||
* The method returns a future that will be completed with the fetched messages.
|
||||
|
|
@ -361,7 +386,8 @@ public class SharePartitionManager implements AutoCloseable {
|
|||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
// TODO: Provide Implementation
|
||||
this.timer.close();
|
||||
this.persister.stop();
|
||||
}
|
||||
|
||||
private ShareSessionKey shareSessionKey(String groupId, Uuid memberId) {
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ import org.apache.kafka.server.group.share.NoOpShareStatePersister;
|
|||
import org.apache.kafka.server.group.share.Persister;
|
||||
import org.apache.kafka.server.share.ShareSessionCache;
|
||||
import org.apache.kafka.server.share.ShareSessionKey;
|
||||
import org.apache.kafka.server.util.timer.MockTimer;
|
||||
import org.apache.kafka.server.util.timer.SystemTimer;
|
||||
import org.apache.kafka.server.util.timer.SystemTimerReaper;
|
||||
import org.apache.kafka.server.util.timer.Timer;
|
||||
|
|
@ -1221,6 +1222,23 @@ public class SharePartitionManagerTest {
|
|||
any(), any(), any(ReplicaQuota.class), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseSharePartitionManager() throws Exception {
|
||||
Timer timer = Mockito.mock(SystemTimerReaper.class);
|
||||
Persister persister = Mockito.mock(Persister.class);
|
||||
SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder()
|
||||
.withTimer(timer).withShareGroupPersister(persister).build();
|
||||
|
||||
// Verify that 0 calls are made to timer.close() and persister.stop().
|
||||
Mockito.verify(timer, times(0)).close();
|
||||
Mockito.verify(persister, times(0)).stop();
|
||||
// Closing the sharePartitionManager closes timer object in sharePartitionManager.
|
||||
sharePartitionManager.close();
|
||||
// Verify that the timer object in sharePartitionManager is closed by checking the calls to timer.close() and persister.stop().
|
||||
Mockito.verify(timer, times(1)).close();
|
||||
Mockito.verify(persister, times(1)).stop();
|
||||
}
|
||||
|
||||
private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() {
|
||||
return new ShareFetchResponseData.PartitionData().setPartitionIndex(0);
|
||||
}
|
||||
|
|
@ -1287,6 +1305,7 @@ public class SharePartitionManagerTest {
|
|||
private ShareSessionCache cache = new ShareSessionCache(10, 1000);
|
||||
private Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>();
|
||||
private Persister persister = NoOpShareStatePersister.getInstance();
|
||||
private Timer timer = new MockTimer();
|
||||
|
||||
private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
|
||||
this.replicaManager = replicaManager;
|
||||
|
|
@ -1313,12 +1332,17 @@ public class SharePartitionManagerTest {
|
|||
return this;
|
||||
}
|
||||
|
||||
private SharePartitionManagerBuilder withTimer(Timer timer) {
|
||||
this.timer = timer;
|
||||
return this;
|
||||
}
|
||||
|
||||
public static SharePartitionManagerBuilder builder() {
|
||||
return new SharePartitionManagerBuilder();
|
||||
}
|
||||
|
||||
public SharePartitionManager build() {
|
||||
return new SharePartitionManager(replicaManager, time, cache, partitionCacheMap, RECORD_LOCK_DURATION_MS, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, persister);
|
||||
return new SharePartitionManager(replicaManager, time, cache, partitionCacheMap, RECORD_LOCK_DURATION_MS, timer, MAX_DELIVERY_COUNT, MAX_IN_FLIGHT_MESSAGES, persister);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue