mirror of https://github.com/apache/kafka.git
MINOR: updated configs to use one try/catch for serdes
removed `try/catch` from `keySerde` and `valueSerde` methods so only the `try\catch` blocks in `defaultKeySerde` and `defaultValueSerde` perform error handling resulting in correct error message. Author: Bill Bejeck <bill@confluent.io> Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com> Closes #3568 from bbejeck/MINOR_ensure_correct_error_messages_for_configs
This commit is contained in:
parent
f4d1305bfe
commit
f8498ec9e2
|
|
@ -746,11 +746,7 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public Serde keySerde() {
|
public Serde keySerde() {
|
||||||
try {
|
return defaultKeySerde();
|
||||||
return defaultKeySerde();
|
|
||||||
} catch (final Exception e) {
|
|
||||||
throw new StreamsException(String.format("Failed to configure key serde %s", get(KEY_SERDE_CLASS_CONFIG)), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -760,16 +756,18 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
* @return an configured instance of key Serde class
|
* @return an configured instance of key Serde class
|
||||||
*/
|
*/
|
||||||
public Serde defaultKeySerde() {
|
public Serde defaultKeySerde() {
|
||||||
|
Object keySerdeConfigSetting = get(KEY_SERDE_CLASS_CONFIG);
|
||||||
try {
|
try {
|
||||||
Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class);
|
Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class);
|
||||||
if (serde == null) {
|
if (serde == null) {
|
||||||
|
keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
|
||||||
serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
|
serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
|
||||||
}
|
}
|
||||||
serde.configure(originals(), true);
|
serde.configure(originals(), true);
|
||||||
return serde;
|
return serde;
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new StreamsException(
|
throw new StreamsException(
|
||||||
String.format("Failed to configure key serde %s", get(DEFAULT_KEY_SERDE_CLASS_CONFIG)), e);
|
String.format("Failed to configure key serde %s", keySerdeConfigSetting), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -781,11 +779,7 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public Serde valueSerde() {
|
public Serde valueSerde() {
|
||||||
try {
|
return defaultValueSerde();
|
||||||
return defaultValueSerde();
|
|
||||||
} catch (final Exception e) {
|
|
||||||
throw new StreamsException(String.format("Failed to configure value serde %s", get(VALUE_SERDE_CLASS_CONFIG)), e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -795,9 +789,11 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
* @return an configured instance of value Serde class
|
* @return an configured instance of value Serde class
|
||||||
*/
|
*/
|
||||||
public Serde defaultValueSerde() {
|
public Serde defaultValueSerde() {
|
||||||
|
Object valueSerdeConfigSetting = get(VALUE_SERDE_CLASS_CONFIG);
|
||||||
try {
|
try {
|
||||||
Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
|
Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
|
||||||
if (serde == null) {
|
if (serde == null) {
|
||||||
|
valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
|
||||||
serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
|
serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
|
||||||
}
|
}
|
||||||
serde.configure(originals(), false);
|
serde.configure(originals(), false);
|
||||||
|
|
@ -805,7 +801,7 @@ public class StreamsConfig extends AbstractConfig {
|
||||||
return serde;
|
return serde;
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new StreamsException(
|
throw new StreamsException(
|
||||||
String.format("Failed to configure value serde %s", get(DEFAULT_VALUE_SERDE_CLASS_CONFIG)), e);
|
String.format("Failed to configure value serde %s", valueSerdeConfigSetting), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -50,6 +50,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
public class StreamsConfigTest {
|
public class StreamsConfigTest {
|
||||||
|
|
||||||
|
|
@ -428,6 +429,60 @@ public class StreamsConfigTest {
|
||||||
assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
|
assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() {
|
||||||
|
final Properties props = minimalStreamsConfig();
|
||||||
|
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
|
||||||
|
final StreamsConfig config = new StreamsConfig(props);
|
||||||
|
try {
|
||||||
|
config.keySerde();
|
||||||
|
fail("Test should throw a StreamsException");
|
||||||
|
} catch (StreamsException e) {
|
||||||
|
assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldSpecifyCorrectKeySerdeClassOnError() {
|
||||||
|
final Properties props = minimalStreamsConfig();
|
||||||
|
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
|
||||||
|
final StreamsConfig config = new StreamsConfig(props);
|
||||||
|
try {
|
||||||
|
config.keySerde();
|
||||||
|
fail("Test should throw a StreamsException");
|
||||||
|
} catch (StreamsException e) {
|
||||||
|
assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() {
|
||||||
|
final Properties props = minimalStreamsConfig();
|
||||||
|
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
|
||||||
|
final StreamsConfig config = new StreamsConfig(props);
|
||||||
|
try {
|
||||||
|
config.valueSerde();
|
||||||
|
fail("Test should throw a StreamsException");
|
||||||
|
} catch (StreamsException e) {
|
||||||
|
assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void shouldSpecifyCorrectValueSerdeClassOnError() {
|
||||||
|
final Properties props = minimalStreamsConfig();
|
||||||
|
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
|
||||||
|
final StreamsConfig config = new StreamsConfig(props);
|
||||||
|
try {
|
||||||
|
config.valueSerde();
|
||||||
|
fail("Test should throw a StreamsException");
|
||||||
|
} catch (StreamsException e) {
|
||||||
|
assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static class MisconfiguredSerde implements Serde {
|
static class MisconfiguredSerde implements Serde {
|
||||||
@Override
|
@Override
|
||||||
public void configure(final Map configs, final boolean isKey) {
|
public void configure(final Map configs, final boolean isKey) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue