diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 527db529f05..df223b69fb1 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -72,6 +72,9 @@ import java.util.regex.Pattern; import java.util.stream.Collector; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; public final class Utils { @@ -1275,4 +1278,34 @@ public final class Utils { } return map; } + + /** + * Convert timestamp to an epoch value + * @param timestamp to be converted + * @return epoch value of a given timestamp + * @throws ParseException for timestamp that doesn't follow ISO8601 format + */ + public static long getDateTime(String timestamp) throws ParseException { + final String[] timestampParts = timestamp.split("T"); + if (timestampParts.length < 2) { + throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length()); + } + + final String secondPart = timestampParts[1]; + if (secondPart == null || secondPart.isEmpty()) { + throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length()); + } + + if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) { + timestamp = timestamp + "Z"; + } + + try { + final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(timestamp); + return date.getTime(); + } catch (final ParseException e) { + final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(timestamp); + return date.getTime(); + } + } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 3cc0043b962..82c24f0a51f 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -58,6 +58,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.murmur2; import static org.apache.kafka.common.utils.Utils.union; import static org.apache.kafka.common.utils.Utils.validHostPattern; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -74,6 +75,9 @@ import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; public class UtilsTest { @@ -784,4 +788,34 @@ public class UtilsTest { assertEquals(msg, exception.get().getMessage()); assertEquals(1, count.get()); } + + @Test + public void shouldAcceptValidDateFormats() throws ParseException { + //check valid formats + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")); + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")); + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")); + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX")); + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")); + } + + @Test + public void shouldThrowOnInvalidDateFormat() { + //check some invalid formats + assertThat(assertThrows(ParseException.class, () -> { + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")); + }).getMessage(), containsString("Unparseable date")); + + assertThat(assertThrows(ParseException.class, () -> { + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X")); + }).getMessage(), containsString("Unparseable date")); + + } + + private void invokeGetDateTimeMethod(final SimpleDateFormat format) throws ParseException { + final Date checkpoint = new Date(); + final String formattedCheckpoint = format.format(checkpoint); + Utils.getDateTime(formattedCheckpoint); + } + } diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 9f33b4e2ecd..36a5010cd35 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -17,7 +17,6 @@ package kafka.admin -import java.text.{ParseException, SimpleDateFormat} import java.time.{Duration, Instant} import java.util.Properties @@ -114,19 +113,6 @@ object ConsumerGroupCommand extends Logging { e.foreach(_.printStackTrace()) } - def convertTimestamp(timeString: String): java.lang.Long = { - val datetime: String = timeString match { - case ts if ts.split("T")(1).contains("+") || ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString - case ts => s"${ts}Z" - } - val date = try { - new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(datetime) - } catch { - case _: ParseException => new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime) - } - date.getTime - } - def printOffsetsToReset(groupAssignmentsToReset: Map[String, Map[TopicPartition, OffsetAndMetadata]]): Unit = { if (groupAssignmentsToReset.nonEmpty) println("\n%-30s %-30s %-10s %-15s".format("GROUP", "TOPIC", "PARTITION", "NEW-OFFSET")) @@ -824,7 +810,7 @@ object ConsumerGroupCommand extends Logging { case (topicPartition, newOffset) => (topicPartition, new OffsetAndMetadata(newOffset)) } } else if (opts.options.has(opts.resetToDatetimeOpt)) { - val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt)) + val timestamp = Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt)) val logTimestampOffsets = getLogTimestampOffsets(groupId, partitionsToReset, timestamp) partitionsToReset.map { topicPartition => val logTimestampOffset = logTimestampOffsets.get(topicPartition) diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index eee52faf002..d356681e112 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -42,13 +42,11 @@ import scala.collection.JavaConverters; import java.io.IOException; import java.text.ParseException; -import java.text.SimpleDateFormat; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -446,7 +444,7 @@ public class StreamsResetter { shiftOffsetsBy(client, inputTopicPartitions, options.valueOf(shiftByOption)); } else if (options.has(toDatetimeOption)) { final String ts = options.valueOf(toDatetimeOption); - final long timestamp = getDateTime(ts); + final long timestamp = Utils.getDateTime(ts); resetToDatetime(client, inputTopicPartitions, timestamp); } else if (options.has(byDurationOption)) { final String duration = options.valueOf(byDurationOption); @@ -563,30 +561,6 @@ public class StreamsResetter { } } - // visible for testing - public long getDateTime(String timestamp) throws ParseException { - final String[] timestampParts = timestamp.split("T"); - if (timestampParts.length < 2) { - throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length()); - } - - final String secondPart = timestampParts[1]; - if (secondPart == null || secondPart.isEmpty()) { - throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length()); - } - - if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) { - timestamp = timestamp + "Z"; - } - - try { - final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(timestamp); - return date.getTime(); - } catch (final ParseException e) { - final Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(timestamp); - return date.getTime(); - } - } private Map parseResetPlan(final String resetPlanCsv) throws ParseException { final Map topicPartitionAndOffset = new HashMap<>(); diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala index 781586f3112..e6dd9407ef0 100644 --- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala @@ -13,7 +13,7 @@ package kafka.admin import java.io.{BufferedWriter, File, FileWriter} -import java.text.{ParseException, SimpleDateFormat} +import java.text.{SimpleDateFormat} import java.util.{Calendar, Date, Properties} import joptsimple.OptionException @@ -29,40 +29,7 @@ import org.junit.Test import scala.jdk.CollectionConverters._ import scala.collection.Seq -class TimeConversionTests { - @Test - def testDateTimeFormats(): Unit = { - //check valid formats - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")) - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")) - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")) - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX")) - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")) - - //check some invalid formats - try { - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")) - fail("Call to getDateTime should fail") - } catch { - case _: ParseException => - } - - try { - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X")) - fail("Call to getDateTime should fail") - } catch { - case _: ParseException => - } - } - - private def invokeGetDateTimeMethod(format: SimpleDateFormat): Unit = { - val checkpoint = new Date() - val timestampString = format.format(checkpoint) - ConsumerGroupCommand.convertTimestamp(timestampString) - } - -} /** * Test cases by: diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java index aa2469ac14b..1e2215dafb1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java @@ -30,11 +30,8 @@ import org.apache.kafka.common.TopicPartitionInfo; import org.junit.Before; import org.junit.Test; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Collections; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -43,7 +40,6 @@ import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public class StreamsResetterTest { @@ -261,38 +257,4 @@ public class StreamsResetterTest { Collections.emptySet(), nodes.get(0)); } - @Test - public void shouldAcceptValidDateFormats() throws ParseException { - //check valid formats - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")); - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")); - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")); - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX")); - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")); - } - - @Test - public void shouldThrowOnInvalidDateFormat() throws ParseException { - //check some invalid formats - try { - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")); - fail("Call to getDateTime should fail"); - } catch (final Exception e) { - e.printStackTrace(); - } - - try { - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X")); - fail("Call to getDateTime should fail"); - } catch (final Exception e) { - e.printStackTrace(); - } - } - - private void invokeGetDateTimeMethod(final SimpleDateFormat format) throws ParseException { - final Date checkpoint = new Date(); - final StreamsResetter streamsResetter = new StreamsResetter(); - final String formattedCheckpoint = format.format(checkpoint); - streamsResetter.getDateTime(formattedCheckpoint); - } } \ No newline at end of file