KAFKA-14828: Remove R/W locks using persistent data structures (#13437)

Currently, StandardAuthorizer uses a R/W lock for maintaining the consistency of data. For the clusters with very high traffic, we will typically see an increase in latencies whenever a write operation comes. The intent of this PR is to get rid of the R/W lock with the help of immutable or persistent collections. Basically, new object references are used to hold the intermediate state of the write operation. After the completion of the operation, the main reference to the cache is changed to point to the new object. Also, for the read operation, the code is changed such that all accesses to the cache for a single read operation are done to a particular cache object only.

In the PR description, you can find the performance of various libraries at the time of both read and write. Read performance is checked with the existing AuthorizerBenchmark. For write performance, a new AuthorizerUpdateBenchmark has been added which evaluates the performance of the addAcl operation.


Reviewers:  Ron Dagostino <rndgstn@gmail.com>, Manikumar Reddy <manikumar.reddy@gmail.com>,  Divij Vaidya <diviv@amazon.com>
This commit is contained in:
Purshotam Chauhan 2023-04-21 14:08:23 +05:30 committed by GitHub
parent 2ee770ac7e
commit df13775254
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1164 additions and 244 deletions

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.acl;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.authorizer.StandardAclWithId;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.kafka.common.acl.AclOperation.READ;
import static org.apache.kafka.common.acl.AclPermissionType.ALLOW;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 0)
@Measurement(iterations = 4)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class StandardAuthorizerUpdateBenchmark {
@Param({"25000", "50000", "75000", "100000"})
private int aclCount;
private final String resourceNamePrefix = "foo-bar35_resource-";
private static final KafkaPrincipal PRINCIPAL = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "test-user");
private StandardAuthorizer authorizer;
private final Set<Uuid> ids = new HashSet<>();
private List<StandardAclWithId> aclsToAdd = prepareAcls();
int index = 0;
private static final Random RANDOM = new Random(System.currentTimeMillis());
@Setup(Level.Trial)
public void setup() throws Exception {
authorizer = new StandardAuthorizer();
addAcls(aclCount);
}
@TearDown(Level.Trial)
public void tearDown() throws IOException {
authorizer.close();
}
@Benchmark
public void testAddAcl() {
StandardAclWithId aclWithId = aclsToAdd.get(index++);
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
}
private List<StandardAclWithId> prepareAcls() {
return IntStream.range(0, 10000)
.mapToObj(i -> {
ResourceType resourceType = RANDOM.nextInt(10) > 7 ? ResourceType.GROUP : ResourceType.TOPIC;
String resourceName = resourceNamePrefix + i;
ResourcePattern resourcePattern = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL);
return aclsForResource(resourcePattern);
})
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
private List<StandardAclWithId> aclsForResource(ResourcePattern pattern) {
return IntStream.range(1, 256)
.mapToObj(i -> {
String p = PRINCIPAL.toString() + RANDOM.nextInt(100);
String h = "127.0.0." + i;
return new StandardAcl(pattern.resourceType(), pattern.name(), pattern.patternType(), p, h, READ, ALLOW);
})
.map(this::withId)
.collect(Collectors.toList());
}
private StandardAclWithId withId(StandardAcl acl) {
Uuid id = new Uuid(acl.hashCode(), acl.hashCode());
while (ids.contains(id)) {
id = Uuid.randomUuid();
}
ids.add(id);
return new StandardAclWithId(id, acl);
}
private void addAcls(int num) {
IntStream.range(0, num)
.mapToObj(aclsToAdd::get)
.forEach(aclWithId -> {
authorizer.addAcl(aclWithId.id(), aclWithId.acl());
index++;
});
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.metadata.authorizer;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.server.immutable.ImmutableMap;
import org.apache.kafka.server.immutable.ImmutableNavigableSet;
import java.util.ArrayList;
import java.util.List;
/**
* An immutable class that stores the ACLs in KRaft-based clusters.
*/
public class AclCache {
/**
* Contains all of the current ACLs sorted by (resource type, resource name).
*/
private final ImmutableNavigableSet<StandardAcl> aclsByResource;
/**
* Contains all of the current ACLs indexed by UUID.
*/
private final ImmutableMap<Uuid, StandardAcl> aclsById;
AclCache() {
this(ImmutableNavigableSet.empty(), ImmutableMap.empty());
}
private AclCache(final ImmutableNavigableSet<StandardAcl> aclsByResource, final ImmutableMap<Uuid, StandardAcl> aclsById) {
this.aclsByResource = aclsByResource;
this.aclsById = aclsById;
}
public ImmutableNavigableSet<StandardAcl> aclsByResource() {
return aclsByResource;
}
Iterable<AclBinding> acls(AclBindingFilter filter) {
List<AclBinding> aclBindingList = new ArrayList<>();
aclsByResource.forEach(acl -> {
AclBinding aclBinding = acl.toBinding();
if (filter.matches(aclBinding)) {
aclBindingList.add(aclBinding);
}
});
return aclBindingList;
}
int count() {
return aclsById.size();
}
StandardAcl getAcl(Uuid id) {
return aclsById.get(id);
}
AclCache addAcl(Uuid id, StandardAcl acl) {
StandardAcl prevAcl = this.aclsById.get(id);
if (prevAcl != null) {
throw new RuntimeException("An ACL with ID " + id + " already exists.");
}
ImmutableMap<Uuid, StandardAcl> aclsById = this.aclsById.updated(id, acl);
if (this.aclsByResource.contains(acl)) {
throw new RuntimeException("Unable to add the ACL with ID " + id +
" to aclsByResource");
}
ImmutableNavigableSet<StandardAcl> aclsByResource = this.aclsByResource.added(acl);
return new AclCache(aclsByResource, aclsById);
}
AclCache removeAcl(Uuid id) {
StandardAcl acl = this.aclsById.get(id);
if (acl == null) {
throw new RuntimeException("ID " + id + " not found in aclsById.");
}
ImmutableMap<Uuid, StandardAcl> aclsById = this.aclsById.removed(id);
if (!this.aclsByResource.contains(acl)) {
throw new RuntimeException("Unable to remove the ACL with ID " + id +
" from aclsByResource");
}
ImmutableNavigableSet<StandardAcl> aclsByResource = this.aclsByResource.removed(acl);
return new AclCache(aclsByResource, aclsById);
}
}

View File

@ -39,7 +39,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
@ -59,8 +58,6 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
*/
private final CompletableFuture<Void> initialLoadFuture = new CompletableFuture<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/**
* The current data. We use a read-write lock to synchronize reads and writes to the data. We
* expect one writer and multiple readers accessing the ACL data, and we use the lock to make
@ -70,23 +67,12 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
@Override
public void setAclMutator(AclMutator aclMutator) {
lock.writeLock().lock();
try {
this.data = data.copyWithNewAclMutator(aclMutator);
} finally {
lock.writeLock().unlock();
}
}
@Override
public AclMutator aclMutatorOrException() {
AclMutator aclMutator;
lock.readLock().lock();
try {
aclMutator = data.aclMutator;
} finally {
lock.readLock().unlock();
}
AclMutator aclMutator = data.aclMutator;
if (aclMutator == null) {
throw new NotControllerException("The current node is not the active controller.");
}
@ -95,12 +81,7 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
@Override
public void completeInitialLoad() {
lock.writeLock().lock();
try {
data = data.copyWithNewLoadingComplete(true);
} finally {
lock.writeLock().unlock();
}
data.log.info("Completed initial ACL load process.");
initialLoadFuture.complete(null);
}
@ -118,22 +99,12 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
@Override
public void addAcl(Uuid id, StandardAcl acl) {
lock.writeLock().lock();
try {
data.addAcl(id, acl);
} finally {
lock.writeLock().unlock();
}
}
@Override
public void removeAcl(Uuid id) {
lock.writeLock().lock();
try {
data.removeAcl(id);
} finally {
lock.writeLock().unlock();
}
}
@Override
@ -142,12 +113,7 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
for (Map.Entry<Uuid, StandardAcl> entry : acls.entrySet()) {
newData.addAcl(entry.getKey(), entry.getValue());
}
lock.writeLock().lock();
try {
data = data.copyWithNewAcls(newData.getAclsByResource(), newData.getAclsById());
} finally {
lock.writeLock().unlock();
}
data = data.copyWithNewAcls(newData.getAclCache());
}
@Override
@ -170,39 +136,24 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
AuthorizableRequestContext requestContext,
List<Action> actions) {
List<AuthorizationResult> results = new ArrayList<>(actions.size());
lock.readLock().lock();
try {
StandardAuthorizerData curData = data;
for (Action action : actions) {
AuthorizationResult result = curData.authorize(requestContext, action);
results.add(result);
}
} finally {
lock.readLock().unlock();
}
return results;
}
@Override
public Iterable<AclBinding> acls(AclBindingFilter filter) {
lock.readLock().lock();
try {
// The Iterable returned here is consistent because it is created over a read-only
// copy of ACLs data.
return data.acls(filter);
} finally {
lock.readLock().unlock();
}
}
@Override
public int aclCount() {
lock.readLock().lock();
try {
return data.aclCount();
} finally {
lock.readLock().unlock();
}
}
@Override
@ -222,32 +173,17 @@ public class StandardAuthorizer implements ClusterMetadataAuthorizer {
} catch (Exception e) {
nodeId = -1;
}
lock.writeLock().lock();
try {
data = data.copyWithNewConfig(nodeId, superUsers, defaultResult);
} finally {
lock.writeLock().unlock();
}
this.data.log.info("set super.users={}, default result={}", String.join(",", superUsers), defaultResult);
}
// VisibleForTesting
Set<String> superUsers() {
lock.readLock().lock();
try {
return new HashSet<>(data.superUsers());
} finally {
lock.readLock().unlock();
}
}
AuthorizationResult defaultResult() {
lock.readLock().lock();
try {
return data.defaultResult();
} finally {
lock.readLock().unlock();
}
}
static Set<String> getConfiguredSuperUsers(Map<String, ?> configs) {

View File

@ -36,15 +36,11 @@ import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.ALTER;
@ -108,14 +104,11 @@ public class StandardAuthorizerData {
private final DefaultRule noAclRule;
/**
* Contains all of the current ACLs sorted by (resource type, resource name).
* Contains all of the current ACLs
*/
private final TreeSet<StandardAcl> aclsByResource;
private AclCache aclCache;
/**
* Contains all of the current ACLs indexed by UUID.
*/
private final HashMap<Uuid, StandardAcl> aclsById;
private static Logger createLogger(int nodeId) {
return new LogContext("[StandardAuthorizer " + nodeId + "] ").logger(StandardAuthorizerData.class);
@ -131,7 +124,7 @@ public class StandardAuthorizerData {
false,
Collections.emptySet(),
DENIED,
new TreeSet<>(), new HashMap<>());
new AclCache());
}
private StandardAuthorizerData(Logger log,
@ -139,16 +132,14 @@ public class StandardAuthorizerData {
boolean loadingComplete,
Set<String> superUsers,
AuthorizationResult defaultResult,
TreeSet<StandardAcl> aclsByResource,
HashMap<Uuid, StandardAcl> aclsById) {
AclCache aclCache) {
this.log = log;
this.auditLog = auditLogger();
this.aclMutator = aclMutator;
this.loadingComplete = loadingComplete;
this.superUsers = superUsers;
this.noAclRule = new DefaultRule(defaultResult);
this.aclsByResource = aclsByResource;
this.aclsById = aclsById;
this.aclCache = aclCache;
}
StandardAuthorizerData copyWithNewAclMutator(AclMutator newAclMutator) {
@ -158,8 +149,7 @@ public class StandardAuthorizerData {
loadingComplete,
superUsers,
noAclRule.result,
aclsByResource,
aclsById);
aclCache);
}
StandardAuthorizerData copyWithNewLoadingComplete(boolean newLoadingComplete) {
@ -168,8 +158,7 @@ public class StandardAuthorizerData {
newLoadingComplete,
superUsers,
noAclRule.result,
aclsByResource,
aclsById);
aclCache);
}
StandardAuthorizerData copyWithNewConfig(int nodeId,
@ -181,35 +170,24 @@ public class StandardAuthorizerData {
loadingComplete,
newSuperUsers,
newDefaultResult,
aclsByResource,
aclsById);
aclCache);
}
StandardAuthorizerData copyWithNewAcls(TreeSet<StandardAcl> aclsByResource, HashMap<Uuid,
StandardAcl> aclsById) {
StandardAuthorizerData copyWithNewAcls(AclCache aclCache) {
StandardAuthorizerData newData = new StandardAuthorizerData(
log,
aclMutator,
loadingComplete,
superUsers,
noAclRule.result,
aclsByResource,
aclsById);
log.info("Initialized with {} acl(s).", aclsById.size());
aclCache);
log.info("Initialized with {} acl(s).", aclCache.count());
return newData;
}
void addAcl(Uuid id, StandardAcl acl) {
try {
StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
if (prevAcl != null) {
throw new RuntimeException("An ACL with ID " + id + " already exists.");
}
if (!aclsByResource.add(acl)) {
aclsById.remove(id);
throw new RuntimeException("Unable to add the ACL with ID " + id +
" to aclsByResource");
}
aclCache = aclCache.addAcl(id, acl);
log.trace("Added ACL {}: {}", id, acl);
} catch (Throwable e) {
log.error("addAcl error", e);
@ -219,15 +197,9 @@ public class StandardAuthorizerData {
void removeAcl(Uuid id) {
try {
StandardAcl acl = aclsById.remove(id);
if (acl == null) {
throw new RuntimeException("ID " + id + " not found in aclsById.");
}
if (!aclsByResource.remove(acl)) {
throw new RuntimeException("Unable to remove the ACL with ID " + id +
" from aclsByResource");
}
log.trace("Removed ACL {}: {}", id, acl);
AclCache aclCacheSnapshot = aclCache.removeAcl(id);
log.trace("Removed ACL {}: {}", id, aclCacheSnapshot.getAcl(id));
aclCache = aclCacheSnapshot;
} catch (Throwable e) {
log.error("removeAcl error", e);
throw e;
@ -243,7 +215,7 @@ public class StandardAuthorizerData {
}
int aclCount() {
return aclsById.size();
return aclCache.count();
}
/**
@ -374,7 +346,8 @@ public class StandardAuthorizerData {
"",
AclOperation.UNKNOWN,
AclPermissionType.UNKNOWN);
checkSection(action, exemplar, matchingPrincipals, host, matchingRuleBuilder);
AclCache aclCacheSnapshot = aclCache;
checkSection(aclCacheSnapshot, action, exemplar, matchingPrincipals, host, matchingRuleBuilder);
if (matchingRuleBuilder.foundDeny()) {
return matchingRuleBuilder.build();
}
@ -390,7 +363,7 @@ public class StandardAuthorizerData {
"",
AclOperation.UNKNOWN,
AclPermissionType.UNKNOWN);
checkSection(action, exemplar, matchingPrincipals, host, matchingRuleBuilder);
checkSection(aclCacheSnapshot, action, exemplar, matchingPrincipals, host, matchingRuleBuilder);
return matchingRuleBuilder.build();
}
@ -409,14 +382,14 @@ public class StandardAuthorizerData {
}
private void checkSection(
Action action,
AclCache aclCacheSnapshot, Action action,
StandardAcl exemplar,
Set<KafkaPrincipal> matchingPrincipals,
String host,
MatchingRuleBuilder matchingRuleBuilder
) {
String resourceName = action.resourcePattern().name();
NavigableSet<StandardAcl> tailSet = aclsByResource.tailSet(exemplar, true);
NavigableSet<StandardAcl> tailSet = aclCacheSnapshot.aclsByResource().tailSet(exemplar, true);
Iterator<StandardAcl> iterator = tailSet.iterator();
while (iterator.hasNext()) {
StandardAcl acl = iterator.next();
@ -446,7 +419,7 @@ public class StandardAuthorizerData {
exemplar.host(),
exemplar.operation(),
exemplar.permissionType());
tailSet = aclsByResource.tailSet(exemplar, true);
tailSet = aclCacheSnapshot.aclsByResource().tailSet(exemplar, true);
iterator = tailSet.iterator();
continue;
}
@ -560,14 +533,7 @@ public class StandardAuthorizerData {
* @return Iterable over AclBindings matching the filter.
*/
Iterable<AclBinding> acls(AclBindingFilter filter) {
List<AclBinding> aclBindingList = new ArrayList<>();
aclsByResource.forEach(acl -> {
AclBinding aclBinding = acl.toBinding();
if (filter.matches(aclBinding)) {
aclBindingList.add(aclBinding);
}
});
return aclBindingList;
return aclCache.acls(filter);
}
private interface MatchingRule {
@ -654,11 +620,7 @@ public class StandardAuthorizerData {
}
}
TreeSet<StandardAcl> getAclsByResource() {
return aclsByResource;
}
HashMap<Uuid, StandardAcl> getAclsById() {
return aclsById;
AclCache getAclCache() {
return aclCache;
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.immutable;
import org.apache.kafka.server.immutable.pcollections.PCollectionsImmutableNavigableSet;
import java.util.NavigableSet;
/**
* A persistent Tree-based NavigableSet wrapper
* java.util.Set methods that mutate in-place will throw UnsupportedOperationException
*
* @param <E> the element type
*/
public interface ImmutableNavigableSet<E> extends ImmutableSet<E>, NavigableSet<E> {
/**
* @return a wrapped tree-based persistent navigable set that is empty
* @param <E> the element type
*/
static <E extends Comparable<? super E>> ImmutableNavigableSet<E> empty() {
return PCollectionsImmutableNavigableSet.empty();
}
/**
* @param e the element
* @return a wrapped tree-based persistent set that is empty
* @param <E> the element type
*/
static <E extends Comparable<? super E>> ImmutableNavigableSet<E> singleton(E e) {
return PCollectionsImmutableNavigableSet.singleton(e);
}
/**
* @param e the element
* @return a wrapped persistent navigable set that differs from this one in that the given element is added (if necessary)
*/
ImmutableNavigableSet<E> added(E e);
/**
* @param e the element
* @return a wrapped persistent navigable set that differs from this one in that the given element is added (if necessary)
*/
ImmutableNavigableSet<E> removed(E e);
}

View File

@ -0,0 +1,275 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.immutable.pcollections;
import org.apache.kafka.server.immutable.ImmutableNavigableSet;
import org.pcollections.TreePSet;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
@SuppressWarnings("deprecation")
public class PCollectionsImmutableNavigableSet<E> implements ImmutableNavigableSet<E> {
private final TreePSet<E> underlying;
/**
* @return a wrapped tree-based persistent navigable set that is empty
* @param <E> the element type
*/
public static <E extends Comparable<? super E>> PCollectionsImmutableNavigableSet<E> empty() {
return new PCollectionsImmutableNavigableSet<>(TreePSet.<E>empty());
}
/**
* @param e the element
* @return a wrapped tree-based persistent set that is empty
* @param <E> the element type
*/
public static <E extends Comparable<? super E>> PCollectionsImmutableNavigableSet<E> singleton(E e) {
return new PCollectionsImmutableNavigableSet<>(TreePSet.singleton(e));
}
public PCollectionsImmutableNavigableSet(TreePSet<E> underlying) {
this.underlying = underlying;
}
@Override
public PCollectionsImmutableNavigableSet<E> added(E e) {
return new PCollectionsImmutableNavigableSet<>(underlying().plus(e));
}
@Override
public PCollectionsImmutableNavigableSet<E> removed(E e) {
return new PCollectionsImmutableNavigableSet<>(underlying().minus(e));
}
@Override
public E lower(E e) {
return underlying().lower(e);
}
@Override
public E floor(E e) {
return underlying().floor(e);
}
@Override
public E ceiling(E e) {
return underlying().ceiling(e);
}
@Override
public E higher(E e) {
return underlying().higher(e);
}
@Override
public E pollFirst() {
// will throw UnsupportedOperationException
return underlying().pollFirst();
}
@Override
public E pollLast() {
// will throw UnsupportedOperationException
return underlying().pollLast();
}
@Override
public PCollectionsImmutableNavigableSet<E> descendingSet() {
return new PCollectionsImmutableNavigableSet<>(underlying().descendingSet());
}
@Override
public Iterator<E> descendingIterator() {
return underlying().descendingIterator();
}
@Override
public PCollectionsImmutableNavigableSet<E> subSet(E fromElement, boolean fromInclusive, E toElement, boolean toInclusive) {
return new PCollectionsImmutableNavigableSet<>(underlying().subSet(fromElement, fromInclusive, toElement, toInclusive));
}
@Override
public PCollectionsImmutableNavigableSet<E> headSet(E toElement, boolean inclusive) {
return new PCollectionsImmutableNavigableSet<>(underlying().headSet(toElement, inclusive));
}
@Override
public PCollectionsImmutableNavigableSet<E> tailSet(E fromElement, boolean inclusive) {
return new PCollectionsImmutableNavigableSet<>(underlying().tailSet(fromElement, inclusive));
}
@Override
public Comparator<? super E> comparator() {
return underlying().comparator();
}
@Override
public PCollectionsImmutableNavigableSet<E> subSet(E fromElement, E toElement) {
return new PCollectionsImmutableNavigableSet<>(underlying().subSet(fromElement, toElement));
}
@Override
public PCollectionsImmutableNavigableSet<E> headSet(E toElement) {
return new PCollectionsImmutableNavigableSet<>(underlying().headSet(toElement));
}
@Override
public PCollectionsImmutableNavigableSet<E> tailSet(E fromElement) {
return new PCollectionsImmutableNavigableSet<>(underlying().tailSet(fromElement));
}
@Override
public E first() {
return underlying().first();
}
@Override
public E last() {
return underlying().last();
}
@Override
public int size() {
return underlying().size();
}
@Override
public boolean isEmpty() {
return underlying().isEmpty();
}
@Override
public boolean contains(Object o) {
return underlying().contains(o);
}
@Override
public Iterator<E> iterator() {
return underlying().iterator();
}
@Override
public void forEach(Consumer<? super E> action) {
underlying().forEach(action);
}
@Override
public Object[] toArray() {
return underlying().toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return underlying().toArray(a);
}
@Override
public boolean add(E e) {
// will throw UnsupportedOperationException
return underlying().add(e);
}
@Override
public boolean remove(Object o) {
// will throw UnsupportedOperationException
return underlying().remove(o);
}
@Override
public boolean containsAll(Collection<?> c) {
return underlying().containsAll(c);
}
@Override
public boolean addAll(Collection<? extends E> c) {
// will throw UnsupportedOperationException
return underlying().addAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
// will throw UnsupportedOperationException
return underlying().retainAll(c);
}
@Override
public boolean removeAll(Collection<?> c) {
// will throw UnsupportedOperationException
return underlying().removeAll(c);
}
@Override
public boolean removeIf(Predicate<? super E> filter) {
// will throw UnsupportedOperationException
return underlying().removeIf(filter);
}
@Override
public void clear() {
// will throw UnsupportedOperationException
underlying().clear();
}
@Override
public Spliterator<E> spliterator() {
return underlying().spliterator();
}
@Override
public Stream<E> stream() {
return underlying().stream();
}
@Override
public Stream<E> parallelStream() {
return underlying().parallelStream();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PCollectionsImmutableNavigableSet<?> that = (PCollectionsImmutableNavigableSet<?>) o;
return Objects.equals(underlying(), that.underlying());
}
@Override
public int hashCode() {
return underlying().hashCode();
}
@Override
public String toString() {
return "PCollectionsImmutableNavigableSet{" +
"underlying=" + underlying() +
'}';
}
// package-private for testing
TreePSet<E> underlying() {
return underlying;
}
}

View File

@ -81,7 +81,7 @@ public class PCollectionsImmutableSet<E> implements ImmutableSet<E> {
@Override
public Iterator<E> iterator() {
return underlying.iterator();
return underlying().iterator();
}
@Override
@ -113,7 +113,7 @@ public class PCollectionsImmutableSet<E> implements ImmutableSet<E> {
@Override
public boolean containsAll(Collection<?> c) {
return underlying.containsAll(c);
return underlying().containsAll(c);
}
@Override

View File

@ -24,6 +24,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
@ -80,6 +81,11 @@ public abstract class DelegationChecker<D, W, T> {
return this;
}
public DelegationChecker<D, W, T> defineMockConfigurationForUnsupportedFunction(Function<D, T> mockConfigurationFunction) {
this.mockConfigurationFunction = Objects.requireNonNull(mockConfigurationFunction);
return this;
}
public DelegationChecker<D, W, T> defineWrapperVoidMethodInvocation(Consumer<W> wrapperConsumer) {
this.wrapperConsumer = Objects.requireNonNull(wrapperConsumer);
return this;
@ -93,6 +99,12 @@ public abstract class DelegationChecker<D, W, T> {
return this;
}
public DelegationChecker<D, W, T> defineWrapperUnsupportedFunctionInvocation(
Function<W, T> wrapperFunctionApplier) {
this.wrapperFunctionApplier = Objects.requireNonNull(wrapperFunctionApplier);
return this;
}
public DelegationChecker<D, W, T> expectWrapperToWrapMockFunctionReturnValue() {
this.expectWrapperToWrapMockFunctionReturnValue = true;
return this;
@ -115,6 +127,18 @@ public abstract class DelegationChecker<D, W, T> {
assertTrue(persistentCollectionMethodInvokedCorrectly);
}
public void doUnsupportedVoidFunctionDelegationCheck() {
if (mockConsumer == null || wrapperConsumer == null) {
throwExceptionForIllegalTestSetup();
}
// configure the mock to behave as desired
mockConsumer.accept(Mockito.doCallRealMethod().when(mock));
assertThrows(UnsupportedOperationException.class, () -> wrapperConsumer.accept(wrapper),
"Expected to Throw UnsupportedOperationException");
}
@SuppressWarnings("unchecked")
public void doFunctionDelegationCheck() {
if (mockConfigurationFunction == null || wrapperFunctionApplier == null ||
@ -139,6 +163,16 @@ public abstract class DelegationChecker<D, W, T> {
}
}
public void doUnsupportedFunctionDelegationCheck() {
if (mockConfigurationFunction == null || wrapperFunctionApplier == null) {
throwExceptionForIllegalTestSetup();
}
when(mockConfigurationFunction.apply(mock)).thenCallRealMethod();
assertThrows(UnsupportedOperationException.class, () -> wrapperFunctionApplier.apply(wrapper),
"Expected to Throw UnsupportedOperationException");
}
private static void throwExceptionForIllegalTestSetup() {
throw new IllegalStateException(
"test setup error: must define both mock and wrapper consumers or both mock and wrapper functions");

View File

@ -68,7 +68,7 @@ public class PCollectionsImmutableMapTest {
}
@Test
public void testDelegationOfAfterAdding() {
public void testDelegationOfUpdated() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.plus(eq(this), eq(this)), SINGLETON_MAP)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.updated(this, this), identity())
@ -77,7 +77,7 @@ public class PCollectionsImmutableMapTest {
}
@Test
public void testDelegationOfAfterRemoving() {
public void testDelegationOfRemoved() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.minus(eq(this)), SINGLETON_MAP)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removed(this), identity())
@ -130,35 +130,35 @@ public class PCollectionsImmutableMapTest {
}
@Test
public void testDelegationOfPut() {
public void testDelegationOfUnsupportedFunctionPut() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.put(eq(this), eq(this)), this)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.put(this, this), identity())
.doFunctionDelegationCheck();
.defineMockConfigurationForUnsupportedFunction(mock -> mock.put(eq(this), eq(this)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.put(this, this))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfRemoveByKey() {
public void testDelegationOfUnsupportedFunctionRemoveByKey() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this)), this)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this), identity())
.doFunctionDelegationCheck();
.defineMockConfigurationForUnsupportedFunction(mock -> mock.remove(eq(this)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.remove(this))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfPutAll() {
public void testDelegationOfUnsupportedFunctionPutAll() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.putAll(eq(Collections.emptyMap())))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.putAll(Collections.emptyMap()))
.doVoidMethodDelegationCheck();
.doUnsupportedVoidFunctionDelegationCheck();
}
@Test
public void testDelegationOfClear() {
public void testDelegationOfUnsupportedFunctionClear() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(HashPMap::clear)
.defineWrapperVoidMethodInvocation(PCollectionsImmutableMap::clear)
.doVoidMethodDelegationCheck();
.doUnsupportedVoidFunctionDelegationCheck();
}
@ -220,25 +220,25 @@ public class PCollectionsImmutableMapTest {
}
@Test
public void testDelegationOfReplaceAll() {
public void testDelegationOfUnsupportedFunctionReplaceAll() {
final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.replaceAll(eq(mockBiFunction)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.replaceAll(mockBiFunction))
.doVoidMethodDelegationCheck();
.doUnsupportedVoidFunctionDelegationCheck();
}
@Test
public void testDelegationOfPutIfAbsent() {
public void testDelegationOfUnsupportedFunctionPutIfAbsent() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.putIfAbsent(eq(this), eq(this)), this)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.putIfAbsent(this, this), identity())
.doFunctionDelegationCheck();
.defineMockConfigurationForUnsupportedFunction(mock -> mock.putIfAbsent(eq(this), eq(this)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.putIfAbsent(this, this))
.doUnsupportedFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfRemoveByKeyAndValue(boolean mockFunctionReturnValue) {
public void testDelegationOfUnsupportedFunctionRemoveByKeyAndValue(boolean mockFunctionReturnValue) {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this), eq(this)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this, this), identity())
@ -247,7 +247,7 @@ public class PCollectionsImmutableMapTest {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfReplaceWhenMappedToSpecificValue(boolean mockFunctionReturnValue) {
public void testDelegationOfUnsupportedFunctionReplaceWhenMappedToSpecificValue(boolean mockFunctionReturnValue) {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this), eq(this)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this, this), identity())
@ -255,47 +255,47 @@ public class PCollectionsImmutableMapTest {
}
@Test
public void testDelegationOfReplaceWhenMappedToAnyValue() {
public void testDelegationOfUnsupportedFunctionReplaceWhenMappedToAnyValue() {
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.replace(eq(this), eq(this)), this)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.replace(this, this), identity())
.doFunctionDelegationCheck();
.defineMockConfigurationForUnsupportedFunction(mock -> mock.replace(eq(this), eq(this)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.replace(this, this))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfComputeIfAbsent() {
public void testDelegationOfUnsupportedFunctionComputeIfAbsent() {
final Function<Object, Object> mockFunction = mock(Function.class);
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.computeIfAbsent(eq(this), eq(mockFunction)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.computeIfAbsent(this, mockFunction))
.doVoidMethodDelegationCheck();
.defineMockConfigurationForUnsupportedFunction(mock -> mock.computeIfAbsent(eq(this), eq(mockFunction)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.computeIfAbsent(this, mockFunction))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfComputeIfPresent() {
public void testDelegationOfUnsupportedFunctionComputeIfPresent() {
final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.computeIfPresent(eq(this), eq(mockBiFunction)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.computeIfPresent(this, mockBiFunction))
.doVoidMethodDelegationCheck();
.defineMockConfigurationForUnsupportedFunction(mock -> mock.computeIfPresent(eq(this), eq(mockBiFunction)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.computeIfPresent(this, mockBiFunction))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfCompute() {
public void testDelegationOfUnsupportedFunctionCompute() {
final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.compute(eq(this), eq(mockBiFunction)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.compute(this, mockBiFunction))
.doVoidMethodDelegationCheck();
.defineMockConfigurationForUnsupportedFunction(mock -> mock.compute(eq(this), eq(mockBiFunction)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.compute(this, mockBiFunction))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfMerge() {
public void testDelegationOfUnsupportedFunctionMerge() {
final BiFunction<Object, Object, Object> mockBiFunction = mock(BiFunction.class);
new PCollectionsHashMapWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.merge(eq(this), eq(this), eq(mockBiFunction)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.merge(this, this, mockBiFunction))
.doVoidMethodDelegationCheck();
.defineMockConfigurationForUnsupportedFunction(mock -> mock.merge(eq(this), eq(this), eq(mockBiFunction)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.merge(this, this, mockBiFunction))
.doUnsupportedFunctionDelegationCheck();
}
@ParameterizedTest

View File

@ -0,0 +1,422 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.immutable.pcollections;
import org.apache.kafka.server.immutable.DelegationChecker;
import org.apache.kafka.server.immutable.ImmutableNavigableSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.pcollections.HashTreePSet;
import org.pcollections.TreePSet;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Stream;
import static java.util.function.Function.identity;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@SuppressWarnings({"unchecked", "deprecation"})
public class PCollectionsImmutableNavigableSetTest {
private static final TreePSet<Integer> SINGLETON_SET = TreePSet.singleton(new Random().nextInt());
private static final class PCollectionsTreeSetWrapperDelegationChecker<R> extends DelegationChecker<TreePSet<Object>, PCollectionsImmutableNavigableSet<Object>, R> {
public PCollectionsTreeSetWrapperDelegationChecker() {
super(mock(TreePSet.class), PCollectionsImmutableNavigableSet::new);
}
public TreePSet<Object> unwrap(PCollectionsImmutableNavigableSet<Object> wrapper) {
return wrapper.underlying();
}
}
@Test
public void testEmptySet() {
Assertions.assertEquals(HashTreePSet.empty(), ((PCollectionsImmutableNavigableSet<?>) ImmutableNavigableSet.empty()).underlying());
}
@Test
public void testSingletonSet() {
Assertions.assertEquals(HashTreePSet.singleton(1), ((PCollectionsImmutableNavigableSet<?>) ImmutableNavigableSet.singleton(1)).underlying());
}
@Test
public void testUnderlying() {
assertSame(SINGLETON_SET, new PCollectionsImmutableNavigableSet<>(SINGLETON_SET).underlying());
}
@Test
public void testDelegationOfAdded() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.plus(eq(SINGLETON_SET.first())), SINGLETON_SET)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.added(SINGLETON_SET.first()), identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfRemoved() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.minus(eq(10)), SINGLETON_SET)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removed(10), identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(ints = {9, 4})
public void testDelegationOfLower(int mockFunctionReturnValue) {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.lower(eq(10)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.lower(10), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(ints = {9, 10})
public void testDelegationOfFloor(int mockFunctionReturnValue) {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.floor(eq(10)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.floor(10), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(ints = {11, 10})
public void testDelegationOfCeiling(int mockFunctionReturnValue) {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.ceiling(eq(10)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.ceiling(10), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(ints = {12, 13})
public void testDelegationOfHigher(int mockFunctionReturnValue) {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.higher(eq(10)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.higher(10), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfUnsupportedFunctionPollFirst() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForUnsupportedFunction(TreePSet::pollFirst)
.defineWrapperUnsupportedFunctionInvocation(PCollectionsImmutableNavigableSet::pollFirst)
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfUnsupportedFunctionPollLast() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForUnsupportedFunction(TreePSet::pollLast)
.defineWrapperUnsupportedFunctionInvocation(PCollectionsImmutableNavigableSet::pollLast)
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfDescendingSet() {
TreePSet<Integer> testSet = TreePSet.from(Arrays.asList(2, 3, 4));
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::descendingSet, testSet.descendingSet())
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::descendingSet, identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfDescendingIterator() {
TreePSet<Integer> testSet = TreePSet.from(Arrays.asList(2, 3, 4));
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::descendingIterator, testSet.descendingIterator())
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::descendingIterator, identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfSubSetWithFromAndToElements() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.subSet(eq(10), eq(true), eq(30), eq(false)), TreePSet.singleton(15))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.subSet(10, true, 30, false), identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfHeadSetInclusive() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.headSet(eq(15), eq(true)), TreePSet.singleton(13))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.headSet(15, true), identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfTailSetInclusive() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.tailSet(eq(15), eq(true)), TreePSet.singleton(15))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.tailSet(15, true), identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfComparator() {
TreePSet<Integer> testSet = TreePSet.from(Arrays.asList(3, 4, 5));
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::comparator, testSet.comparator())
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::comparator, identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfSubSetWithFromElement() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.subSet(eq(15), eq(true)), TreePSet.singleton(13))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.subSet(15, true), identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfHeadSet() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.headSet(eq(15)), TreePSet.singleton(13))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.headSet(15), identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfTailSet() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.tailSet(eq(15)), TreePSet.singleton(13))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.tailSet(15), identity())
.expectWrapperToWrapMockFunctionReturnValue()
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(ints = {1, 2})
public void testDelegationOfFirst() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::first, 13)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::first, identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(ints = {1, 2})
public void testDelegationOfLast() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::last, 13)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::last, identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(ints = {1, 2})
public void testDelegationOfSize(int mockFunctionReturnValue) {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::size, mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::size, identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfIsEmpty(boolean mockFunctionReturnValue) {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::isEmpty, mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::isEmpty, identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfContains(boolean mockFunctionReturnValue) {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.contains(eq(this)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.contains(this), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfIterator() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::iterator, mock(Iterator.class))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::iterator, identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfForEach() {
final Consumer<Object> mockConsumer = mock(Consumer.class);
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(mock -> mock.forEach(eq(mockConsumer)))
.defineWrapperVoidMethodInvocation(wrapper -> wrapper.forEach(mockConsumer))
.doVoidMethodDelegationCheck();
}
@Test
public void testDelegationOfToArray() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::toArray, new Object[0])
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::toArray, identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfToArrayIntoGivenDestination() {
Object[] destinationArray = new Object[0];
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.toArray(eq(destinationArray)), new Object[0])
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.toArray(destinationArray), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfUnsupportedFunctionAdd() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForUnsupportedFunction(mock -> mock.add(eq(this)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.add(this))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfUnsupportedFunctionRemove() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForUnsupportedFunction(mock -> mock.remove(eq(this)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.remove(this))
.doUnsupportedFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfContainsAll(boolean mockFunctionReturnValue) {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.containsAll(eq(Collections.emptyList())), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.containsAll(Collections.emptyList()), identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfUnsupportedFunctionAddAll() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForUnsupportedFunction(mock -> mock.addAll(eq(Collections.emptyList())))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.addAll(Collections.emptyList()))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfUnsupportedFunctionRetainAll() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForUnsupportedFunction(mock -> mock.retainAll(eq(Collections.emptyList())))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.retainAll(Collections.emptyList()))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfUnsupportedFunctionRemoveAll() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForUnsupportedFunction(mock -> mock.removeAll(eq(Collections.emptyList())))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.removeAll(Collections.emptyList()))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfUnsupportedFunctionRemoveIf() {
final Predicate<Object> mockPredicate = mock(Predicate.class);
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForUnsupportedFunction(mock -> mock.removeIf(eq(mockPredicate)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.removeIf(mockPredicate))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfUnsupportedFunctionClear() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(TreePSet::clear)
.defineWrapperVoidMethodInvocation(PCollectionsImmutableNavigableSet::clear)
.doUnsupportedVoidFunctionDelegationCheck();
}
@Test
public void testDelegationOfSpliterator() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::spliterator, mock(Spliterator.class))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::spliterator, identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfStream() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::stream, mock(Stream.class))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::stream, identity())
.doFunctionDelegationCheck();
}
@Test
public void testDelegationOfParallelStream() {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::parallelStream, mock(Stream.class))
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::parallelStream, identity())
.doFunctionDelegationCheck();
}
@Test
public void testEquals() {
final TreePSet<Object> mock = mock(TreePSet.class);
assertEquals(new PCollectionsImmutableNavigableSet<>(mock), new PCollectionsImmutableNavigableSet<>(mock));
final TreePSet<Object> someOtherMock = mock(TreePSet.class);
assertNotEquals(new PCollectionsImmutableNavigableSet<>(mock), new PCollectionsImmutableNavigableSet<>(someOtherMock));
}
@Test
public void testHashCode() {
final TreePSet<Object> mock = mock(TreePSet.class);
assertEquals(mock.hashCode(), new PCollectionsImmutableNavigableSet<>(mock).hashCode());
final TreePSet<Object> someOtherMock = mock(TreePSet.class);
assertNotEquals(mock.hashCode(), new PCollectionsImmutableNavigableSet<>(someOtherMock).hashCode());
}
@ParameterizedTest
@ValueSource(strings = {"a", "b"})
public void testDelegationOfToString(String mockFunctionReturnValue) {
new PCollectionsTreeSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(TreePSet::toString, mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(PCollectionsImmutableNavigableSet::toString,
text -> "PCollectionsImmutableNavigableSet{underlying=" + text + "}")
.doFunctionDelegationCheck();
}
}

View File

@ -71,7 +71,7 @@ public class PCollectionsImmutableSetTest {
}
@Test
public void testDelegationOfAfterAdding() {
public void testDelegationOfAdded() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.plus(eq(this)), SINGLETON_SET)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.added(this), identity())
@ -80,7 +80,7 @@ public class PCollectionsImmutableSetTest {
}
@Test
public void testDelegationOfAfterRemoving() {
public void testDelegationOfRemoved() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.minus(eq(this)), SINGLETON_SET)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removed(this), identity())
@ -149,22 +149,20 @@ public class PCollectionsImmutableSetTest {
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfAdd(boolean mockFunctionReturnValue) {
@Test
public void testDelegationOfUnsupportedFunctionAdd() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.add(eq(this)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.add(this), identity())
.doFunctionDelegationCheck();
.defineMockConfigurationForUnsupportedFunction(mock -> mock.add(eq(this)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.add(this))
.doUnsupportedFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfRemove(boolean mockFunctionReturnValue) {
@Test
public void testDelegationOfUnsupportedFunctionRemove() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.remove(eq(this)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.remove(this), identity())
.doFunctionDelegationCheck();
.defineMockConfigurationForUnsupportedFunction(mock -> mock.remove(eq(this)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.remove(this))
.doUnsupportedFunctionDelegationCheck();
}
@ParameterizedTest
@ -176,49 +174,45 @@ public class PCollectionsImmutableSetTest {
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfAddAll(boolean mockFunctionReturnValue) {
@Test
public void testDelegationOfUnsupportedFunctionAddAll() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.addAll(eq(Collections.emptyList())), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.addAll(Collections.emptyList()), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfRetainAll(boolean mockFunctionReturnValue) {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.retainAll(eq(Collections.emptyList())), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.retainAll(Collections.emptyList()), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfRemoveAll(boolean mockFunctionReturnValue) {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.removeAll(eq(Collections.emptyList())), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removeAll(Collections.emptyList()), identity())
.doFunctionDelegationCheck();
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDelegationOfRemoveIf(boolean mockFunctionReturnValue) {
final Predicate<Object> mockPredicate = mock(Predicate.class);
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForFunctionInvocation(mock -> mock.removeIf(eq(mockPredicate)), mockFunctionReturnValue)
.defineWrapperFunctionInvocationAndMockReturnValueTransformation(wrapper -> wrapper.removeIf(mockPredicate), identity())
.doFunctionDelegationCheck();
.defineMockConfigurationForUnsupportedFunction(mock -> mock.addAll(eq(Collections.emptyList())))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.addAll(Collections.emptyList()))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfClear() {
public void testDelegationOfUnsupportedFunctionRetainAll() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForUnsupportedFunction(mock -> mock.retainAll(eq(Collections.emptyList())))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.retainAll(Collections.emptyList()))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfUnsupportedFunctionRemoveAll() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForUnsupportedFunction(mock -> mock.removeAll(eq(Collections.emptyList())))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.removeAll(Collections.emptyList()))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfUnsupportedFunctionRemoveIf() {
final Predicate<Object> mockPredicate = mock(Predicate.class);
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForUnsupportedFunction(mock -> mock.removeIf(eq(mockPredicate)))
.defineWrapperUnsupportedFunctionInvocation(wrapper -> wrapper.removeIf(mockPredicate))
.doUnsupportedFunctionDelegationCheck();
}
@Test
public void testDelegationOfUnsupportedFunctionClear() {
new PCollectionsHashSetWrapperDelegationChecker<>()
.defineMockConfigurationForVoidMethodInvocation(MapPSet::clear)
.defineWrapperVoidMethodInvocation(PCollectionsImmutableSet::clear)
.doVoidMethodDelegationCheck();
.doUnsupportedVoidFunctionDelegationCheck();
}
@Test