KAFKA-6585: Consolidate duplicated logic on reset tools (#9255)

Reviewers: Navinder Pal Singh Brar <navinder_brar@yahoo.com>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
manijndl7 2020-09-30 22:43:01 +05:30 committed by GitHub
parent fda6701837
commit 2fda4458b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 70 additions and 114 deletions

View File

@ -72,6 +72,9 @@ import java.util.regex.Pattern;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public final class Utils {
@ -1275,4 +1278,34 @@ public final class Utils {
}
return map;
}
/**
* Convert timestamp to an epoch value
* @param timestamp to be converted
* @return epoch value of a given timestamp
* @throws ParseException for timestamp that doesn't follow ISO8601 format
*/
public static long getDateTime(String timestamp) throws ParseException {
final String[] timestampParts = timestamp.split("T");
if (timestampParts.length < 2) {
throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length());
}
final String secondPart = timestampParts[1];
if (secondPart == null || secondPart.isEmpty()) {
throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length());
}
if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) {
timestamp = timestamp + "Z";
}
try {
final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(timestamp);
return date.getTime();
} catch (final ParseException e) {
final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(timestamp);
return date.getTime();
}
}
}

View File

@ -58,6 +58,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.common.utils.Utils.murmur2;
import static org.apache.kafka.common.utils.Utils.union;
import static org.apache.kafka.common.utils.Utils.validHostPattern;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@ -74,6 +75,9 @@ import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class UtilsTest {
@ -784,4 +788,34 @@ public class UtilsTest {
assertEquals(msg, exception.get().getMessage());
assertEquals(1, count.get());
}
@Test
public void shouldAcceptValidDateFormats() throws ParseException {
//check valid formats
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
}
@Test
public void shouldThrowOnInvalidDateFormat() {
//check some invalid formats
assertThat(assertThrows(ParseException.class, () -> {
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));
}).getMessage(), containsString("Unparseable date"));
assertThat(assertThrows(ParseException.class, () -> {
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X"));
}).getMessage(), containsString("Unparseable date"));
}
private void invokeGetDateTimeMethod(final SimpleDateFormat format) throws ParseException {
final Date checkpoint = new Date();
final String formattedCheckpoint = format.format(checkpoint);
Utils.getDateTime(formattedCheckpoint);
}
}

View File

@ -17,7 +17,6 @@
package kafka.admin
import java.text.{ParseException, SimpleDateFormat}
import java.time.{Duration, Instant}
import java.util.Properties
@ -114,19 +113,6 @@ object ConsumerGroupCommand extends Logging {
e.foreach(_.printStackTrace())
}
def convertTimestamp(timeString: String): java.lang.Long = {
val datetime: String = timeString match {
case ts if ts.split("T")(1).contains("+") || ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString
case ts => s"${ts}Z"
}
val date = try {
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(datetime)
} catch {
case _: ParseException => new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime)
}
date.getTime
}
def printOffsetsToReset(groupAssignmentsToReset: Map[String, Map[TopicPartition, OffsetAndMetadata]]): Unit = {
if (groupAssignmentsToReset.nonEmpty)
println("\n%-30s %-30s %-10s %-15s".format("GROUP", "TOPIC", "PARTITION", "NEW-OFFSET"))
@ -824,7 +810,7 @@ object ConsumerGroupCommand extends Logging {
case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset))
}
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt))
val timestamp = Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt))
val logTimestampOffsets = getLogTimestampOffsets(groupId, partitionsToReset, timestamp)
partitionsToReset.map { topicPartition =>
val logTimestampOffset = logTimestampOffsets.get(topicPartition)

View File

@ -42,13 +42,11 @@ import scala.collection.JavaConverters;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@ -446,7 +444,7 @@ public class StreamsResetter {
shiftOffsetsBy(client, inputTopicPartitions, options.valueOf(shiftByOption));
} else if (options.has(toDatetimeOption)) {
final String ts = options.valueOf(toDatetimeOption);
final long timestamp = getDateTime(ts);
final long timestamp = Utils.getDateTime(ts);
resetToDatetime(client, inputTopicPartitions, timestamp);
} else if (options.has(byDurationOption)) {
final String duration = options.valueOf(byDurationOption);
@ -563,30 +561,6 @@ public class StreamsResetter {
}
}
// visible for testing
public long getDateTime(String timestamp) throws ParseException {
final String[] timestampParts = timestamp.split("T");
if (timestampParts.length < 2) {
throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length());
}
final String secondPart = timestampParts[1];
if (secondPart == null || secondPart.isEmpty()) {
throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length());
}
if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) {
timestamp = timestamp + "Z";
}
try {
final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(timestamp);
return date.getTime();
} catch (final ParseException e) {
final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(timestamp);
return date.getTime();
}
}
private Map<TopicPartition, Long> parseResetPlan(final String resetPlanCsv) throws ParseException {
final Map<TopicPartition, Long> topicPartitionAndOffset = new HashMap<>();

View File

@ -13,7 +13,7 @@
package kafka.admin
import java.io.{BufferedWriter, File, FileWriter}
import java.text.{ParseException, SimpleDateFormat}
import java.text.{SimpleDateFormat}
import java.util.{Calendar, Date, Properties}
import joptsimple.OptionException
@ -29,40 +29,7 @@ import org.junit.Test
import scala.jdk.CollectionConverters._
import scala.collection.Seq
class TimeConversionTests {
@Test
def testDateTimeFormats(): Unit = {
//check valid formats
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"))
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"))
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"))
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"))
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
//check some invalid formats
try {
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"))
fail("Call to getDateTime should fail")
} catch {
case _: ParseException =>
}
try {
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X"))
fail("Call to getDateTime should fail")
} catch {
case _: ParseException =>
}
}
private def invokeGetDateTimeMethod(format: SimpleDateFormat): Unit = {
val checkpoint = new Date()
val timestampString = format.format(checkpoint)
ConsumerGroupCommand.convertTimestamp(timestampString)
}
}
/**
* Test cases by:

View File

@ -30,11 +30,8 @@ import org.apache.kafka.common.TopicPartitionInfo;
import org.junit.Before;
import org.junit.Test;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -43,7 +40,6 @@ import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class StreamsResetterTest {
@ -261,38 +257,4 @@ public class StreamsResetterTest {
Collections.<String>emptySet(), nodes.get(0));
}
@Test
public void shouldAcceptValidDateFormats() throws ParseException {
//check valid formats
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"));
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ"));
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX"));
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX"));
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"));
}
@Test
public void shouldThrowOnInvalidDateFormat() throws ParseException {
//check some invalid formats
try {
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"));
fail("Call to getDateTime should fail");
} catch (final Exception e) {
e.printStackTrace();
}
try {
invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X"));
fail("Call to getDateTime should fail");
} catch (final Exception e) {
e.printStackTrace();
}
}
private void invokeGetDateTimeMethod(final SimpleDateFormat format) throws ParseException {
final Date checkpoint = new Date();
final StreamsResetter streamsResetter = new StreamsResetter();
final String formattedCheckpoint = format.format(checkpoint);
streamsResetter.getDateTime(formattedCheckpoint);
}
}