mirror of https://github.com/apache/kafka.git
MINOR: cleanup ProcessorContextImplTest (#18682)
Reviewers: Bill Bejeck <bill@confluent.io>
This commit is contained in:
parent
40890faa1b
commit
ad79b4afa7
|
@ -141,8 +141,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void globalKeyValueStoreShouldBeReadOnly() {
|
public void globalKeyValueStoreShouldBeReadOnly() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
when(stateManager.globalStore(anyString())).thenReturn(null);
|
when(stateManager.globalStore(anyString())).thenReturn(null);
|
||||||
|
|
||||||
|
@ -173,8 +171,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void globalTimestampedKeyValueStoreShouldBeReadOnly() {
|
public void globalTimestampedKeyValueStoreShouldBeReadOnly() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
when(stateManager.globalStore(anyString())).thenReturn(null);
|
when(stateManager.globalStore(anyString())).thenReturn(null);
|
||||||
|
|
||||||
|
@ -299,8 +295,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void localKeyValueStoreShouldNotAllowInitOrClose() {
|
public void localKeyValueStoreShouldNotAllowInitOrClose() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
when(stateManager.globalStore(anyString())).thenReturn(null);
|
when(stateManager.globalStore(anyString())).thenReturn(null);
|
||||||
|
|
||||||
|
@ -343,8 +337,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() {
|
public void localTimestampedKeyValueStoreShouldNotAllowInitOrClose() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
when(stateManager.globalStore(anyString())).thenReturn(null);
|
when(stateManager.globalStore(anyString())).thenReturn(null);
|
||||||
|
|
||||||
|
@ -521,8 +513,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldNotSendRecordHeadersToChangelogTopic() {
|
public void shouldNotSendRecordHeadersToChangelogTopic() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
when(stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).thenReturn(CHANGELOG_PARTITION);
|
when(stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).thenReturn(CHANGELOG_PARTITION);
|
||||||
|
|
||||||
|
@ -553,18 +543,9 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldSendRecordHeadersToChangelogTopicWhenConsistencyEnabled() {
|
public void shouldSendRecordHeadersToChangelogTopicWhenConsistencyEnabled() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
when(stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).thenReturn(CHANGELOG_PARTITION);
|
when(stateManager.registeredChangelogPartitionFor(REGISTERED_STORE_NAME)).thenReturn(CHANGELOG_PARTITION);
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
final Position position = Position.emptyPosition();
|
final Position position = Position.emptyPosition();
|
||||||
final Headers headers = new RecordHeaders();
|
final Headers headers = new RecordHeaders();
|
||||||
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
|
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
|
||||||
|
@ -593,17 +574,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnLogChange() {
|
public void shouldThrowUnsupportedOperationExceptionOnLogChange() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -613,17 +583,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnGetStateStore() {
|
public void shouldThrowUnsupportedOperationExceptionOnGetStateStore() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -633,17 +592,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnForward() {
|
public void shouldThrowUnsupportedOperationExceptionOnForward() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -653,17 +601,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() {
|
public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -673,17 +610,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnCommit() {
|
public void shouldThrowUnsupportedOperationExceptionOnCommit() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -693,17 +619,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnSchedule() {
|
public void shouldThrowUnsupportedOperationExceptionOnSchedule() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -713,17 +628,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnTopic() {
|
public void shouldThrowUnsupportedOperationExceptionOnTopic() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -733,17 +637,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnPartition() {
|
public void shouldThrowUnsupportedOperationExceptionOnPartition() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -753,17 +646,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnOffset() {
|
public void shouldThrowUnsupportedOperationExceptionOnOffset() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -773,17 +655,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnTimestamp() {
|
public void shouldThrowUnsupportedOperationExceptionOnTimestamp() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -793,17 +664,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnCurrentNode() {
|
public void shouldThrowUnsupportedOperationExceptionOnCurrentNode() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -813,17 +673,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnSetRecordContext() {
|
public void shouldThrowUnsupportedOperationExceptionOnSetRecordContext() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -833,17 +682,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldThrowUnsupportedOperationExceptionOnRecordContext() {
|
public void shouldThrowUnsupportedOperationExceptionOnRecordContext() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
|
||||||
|
|
||||||
final StreamTask task = mock(StreamTask.class);
|
|
||||||
context.transitionToActive(task, null, null);
|
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
|
||||||
|
|
||||||
context = getStandbyContext();
|
context = getStandbyContext();
|
||||||
assertThrows(
|
assertThrows(
|
||||||
UnsupportedOperationException.class,
|
UnsupportedOperationException.class,
|
||||||
|
@ -853,8 +691,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldMatchStreamTime() {
|
public void shouldMatchStreamTime() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
||||||
|
@ -870,8 +706,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldAddAndGetProcessorKeyValue() {
|
public void shouldAddAndGetProcessorKeyValue() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
||||||
|
@ -891,8 +725,6 @@ public class ProcessorContextImplTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void shouldSetAndGetProcessorMetaData() {
|
public void shouldSetAndGetProcessorMetaData() {
|
||||||
foreachSetUp();
|
|
||||||
|
|
||||||
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
context = buildProcessorContextImpl(streamsConfig, stateManager);
|
||||||
|
|
||||||
mockProcessorNodeWithLocalKeyValueStore();
|
mockProcessorNodeWithLocalKeyValueStore();
|
||||||
|
|
Loading…
Reference in New Issue