mirror of https://github.com/apache/kafka.git
KAFKA-14974: Restore backward compatibility in KafkaBasedLog (#13688)
`KafkaBasedLog` is a widely used utility class that provides a generic implementation of a shared, compacted log of records in a Kafka topic. It isn't in Connect's public API, but has been used outside of Connect and we try to preserve backward compatibility whenever possible. KAFKA-14455 modified the two overloaded void `KafkaBasedLog::send` methods to return a `Future`. While this change is source compatible, it isn't binary compatible. We can restore backward compatibility simply by renaming the new Future returning send methods, and reinstating the older send methods to delegate to the newer methods. This refactoring changes no functionality other than restoring the older methods. Reviewers: Randall Hauch <rhauch@gmail.com>
This commit is contained in:
parent
b40a7fc037
commit
59ba9dbbc9
|
@ -646,7 +646,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
byte[] serializedTargetState = converter.fromConnectData(topic, TARGET_STATE_V1, connectTargetState);
|
||||
log.debug("Writing target state {} for connector {}", state, connector);
|
||||
try {
|
||||
configLog.send(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||
configLog.sendWithReceipt(TARGET_STATE_KEY(connector), serializedTargetState).get(READ_WRITE_TOTAL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
log.error("Failed to write target state to Kafka", e);
|
||||
throw new ConnectException("Error writing target state to Kafka", e);
|
||||
|
@ -798,7 +798,7 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
|
|||
if (!usesFencableWriter) {
|
||||
List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
|
||||
keyValues.forEach(
|
||||
keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value))
|
||||
keyValue -> producerFutures.add(configLog.sendWithReceipt(keyValue.key, keyValue.value))
|
||||
);
|
||||
|
||||
timer.update();
|
||||
|
|
|
@ -77,6 +77,10 @@ import java.util.function.Supplier;
|
|||
* calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked
|
||||
* and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required.
|
||||
* </p>
|
||||
* <p>
|
||||
* This is a useful utility that has been used outside of Connect. This isn't in Connect's public API,
|
||||
* but we've tried to maintain the method signatures and backward compatibility since early Kafka versions.
|
||||
* </p>
|
||||
*/
|
||||
public class KafkaBasedLog<K, V> {
|
||||
private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
|
||||
|
@ -351,6 +355,31 @@ public class KafkaBasedLog<K, V> {
|
|||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a record asynchronously to the configured {@link #topic} without using a producer callback.
|
||||
* <p>
|
||||
* This method exists for backward compatibility reasons and delegates to the newer
|
||||
* {@link #sendWithReceipt(Object, Object)} method that returns a future.
|
||||
* @param key the key for the {@link ProducerRecord}
|
||||
* @param value the value for the {@link ProducerRecord}
|
||||
*/
|
||||
public void send(K key, V value) {
|
||||
sendWithReceipt(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a record asynchronously to the configured {@link #topic}.
|
||||
* <p>
|
||||
* This method exists for backward compatibility reasons and delegates to the newer
|
||||
* {@link #sendWithReceipt(Object, Object, org.apache.kafka.clients.producer.Callback)} method that returns a future.
|
||||
* @param key the key for the {@link ProducerRecord}
|
||||
* @param value the value for the {@link ProducerRecord}
|
||||
* @param callback the callback to invoke after completion; can be null if no callback is desired
|
||||
*/
|
||||
public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
|
||||
sendWithReceipt(key, value, callback);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a record asynchronously to the configured {@link #topic} without using a producer callback.
|
||||
* @param key the key for the {@link ProducerRecord}
|
||||
|
@ -359,12 +388,12 @@ public class KafkaBasedLog<K, V> {
|
|||
* @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
|
||||
* future if synchronous behavior is desired.
|
||||
*/
|
||||
public Future<RecordMetadata> send(K key, V value) {
|
||||
return send(key, value, null);
|
||||
public Future<RecordMetadata> sendWithReceipt(K key, V value) {
|
||||
return sendWithReceipt(key, value, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a record asynchronously to the configured {@link #topic}
|
||||
* Send a record asynchronously to the configured {@link #topic}.
|
||||
* @param key the key for the {@link ProducerRecord}
|
||||
* @param value the value for the {@link ProducerRecord}
|
||||
* @param callback the callback to invoke after completion; can be null if no callback is desired
|
||||
|
@ -372,7 +401,7 @@ public class KafkaBasedLog<K, V> {
|
|||
* @return the future from the call to {@link Producer#send}. {@link Future#get} can be called on this returned
|
||||
* future if synchronous behavior is desired.
|
||||
*/
|
||||
public Future<RecordMetadata> send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
|
||||
public Future<RecordMetadata> sendWithReceipt(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
|
||||
return producer.orElseThrow(() ->
|
||||
new IllegalStateException("This KafkaBasedLog was created in read-only mode and does not support write operations")
|
||||
).send(new ProducerRecord<>(topic, key, value), callback);
|
||||
|
|
|
@ -353,7 +353,7 @@ public class KafkaConfigBackingStoreTest {
|
|||
|
||||
expectConvert(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONNECTOR_CONFIG_STRUCTS.get(0), CONFIGS_SERIALIZED.get(0));
|
||||
|
||||
storeLog.send(EasyMock.anyObject(), EasyMock.anyObject());
|
||||
storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.anyObject());
|
||||
EasyMock.expectLastCall().andReturn(producerFuture);
|
||||
|
||||
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
|
||||
|
@ -388,13 +388,13 @@ public class KafkaConfigBackingStoreTest {
|
|||
@SuppressWarnings("unchecked")
|
||||
Future<RecordMetadata> connectorConfigProducerFuture = PowerMock.createMock(Future.class);
|
||||
// tombstone for the connector config
|
||||
storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
|
||||
storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull());
|
||||
EasyMock.expectLastCall().andReturn(connectorConfigProducerFuture);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Future<RecordMetadata> targetStateProducerFuture = PowerMock.createMock(Future.class);
|
||||
// tombstone for the connector target state
|
||||
storeLog.send(EasyMock.anyObject(), EasyMock.isNull());
|
||||
storeLog.sendWithReceipt(EasyMock.anyObject(), EasyMock.isNull());
|
||||
EasyMock.expectLastCall().andReturn(targetStateProducerFuture);
|
||||
|
||||
connectorConfigProducerFuture.get(EasyMock.eq(READ_WRITE_TOTAL_TIMEOUT_MS), EasyMock.anyObject());
|
||||
|
@ -469,7 +469,7 @@ public class KafkaConfigBackingStoreTest {
|
|||
|
||||
// In the meantime, write a target state (which doesn't require write privileges)
|
||||
expectConvert(KafkaConfigBackingStore.TARGET_STATE_V1, TARGET_STATE_PAUSED, CONFIGS_SERIALIZED.get(1));
|
||||
storeLog.send("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
|
||||
storeLog.sendWithReceipt("target-state-" + CONNECTOR_IDS.get(1), CONFIGS_SERIALIZED.get(1));
|
||||
EasyMock.expectLastCall().andReturn(producerFuture);
|
||||
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
|
||||
EasyMock.expectLastCall().andReturn(null);
|
||||
|
@ -1677,7 +1677,7 @@ public class KafkaConfigBackingStoreTest {
|
|||
EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
|
||||
.andReturn(serialized);
|
||||
|
||||
storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
|
||||
storeLog.sendWithReceipt(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
|
||||
EasyMock.expectLastCall().andReturn(producerFuture);
|
||||
|
||||
producerFuture.get(EasyMock.anyLong(), EasyMock.anyObject());
|
||||
|
|
Loading…
Reference in New Issue