KAFKA-14586: Moving StreamResetter to tools (#13127)

Moves StreamResetter to tools project.

Reviewers: Federico Valeri <fedevaleri@gmail.com>, Christo Lolov <lolovc@amazon.com>, Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
vamossagar12 2023-03-28 18:13:22 +05:30 committed by GitHub
parent f3e4dd9229
commit c14f56b484
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 13 additions and 9 deletions

View File

@ -18,4 +18,4 @@ if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M" export KAFKA_HEAP_OPTS="-Xmx512M"
fi fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.StreamsResetter "$@" exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.StreamsResetter "$@"

View File

@ -19,5 +19,5 @@ IF ["%KAFKA_HEAP_OPTS%"] EQU [""] (
set KAFKA_HEAP_OPTS=-Xmx512M set KAFKA_HEAP_OPTS=-Xmx512M
) )
"%~dp0kafka-run-class.bat" kafka.tools.StreamsResetter %* "%~dp0kafka-run-class.bat" org.apache.kafka.tools.StreamsResetter %*
EndLocal EndLocal

View File

@ -1934,6 +1934,7 @@ project(':streams') {
testImplementation project(':clients').sourceSets.test.output testImplementation project(':clients').sourceSets.test.output
testImplementation project(':core') testImplementation project(':core')
testImplementation project(':tools')
testImplementation project(':core').sourceSets.test.output testImplementation project(':core').sourceSets.test.output
testImplementation project(':server-common').sourceSets.test.output testImplementation project(':server-common').sourceSets.test.output
testImplementation libs.log4j testImplementation libs.log4j

View File

@ -425,6 +425,8 @@
<allow pkg="kafka.test" /> <allow pkg="kafka.test" />
<allow pkg="joptsimple" /> <allow pkg="joptsimple" />
<allow pkg="javax.rmi.ssl"/> <allow pkg="javax.rmi.ssl"/>
<allow pkg="kafka.utils" />
<allow pkg="scala.collection" />
</subpackage> </subpackage>
<subpackage name="trogdor"> <subpackage name="trogdor">
@ -483,6 +485,7 @@
<allow pkg="scala" /> <allow pkg="scala" />
<allow class="kafka.zk.EmbeddedZookeeper"/> <allow class="kafka.zk.EmbeddedZookeeper"/>
<allow pkg="com.fasterxml.jackson" /> <allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.tools" />
</subpackage> </subpackage>
<subpackage name="test"> <subpackage name="test">
@ -490,7 +493,7 @@
</subpackage> </subpackage>
<subpackage name="tools"> <subpackage name="tools">
<allow pkg="kafka.tools" /> <allow pkg="org.apache.kafka.tools" />
</subpackage> </subpackage>
<subpackage name="state"> <subpackage name="state">

View File

@ -611,7 +611,7 @@ public class InternalTopicManager {
if (!existedTopicPartition.get(topicName).equals(numberOfPartitions.get())) { if (!existedTopicPartition.get(topicName).equals(numberOfPartitions.get())) {
final String errorMsg = String.format("Existing internal topic %s has invalid partitions: " + final String errorMsg = String.format("Existing internal topic %s has invalid partitions: " +
"expected: %d; actual: %d. " + "expected: %d; actual: %d. " +
"Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.", "Use 'org.apache.kafka.tools.StreamsResetter' tool to clean up invalid topics before processing.",
topicName, numberOfPartitions.get(), existedTopicPartition.get(topicName)); topicName, numberOfPartitions.get(), existedTopicPartition.get(topicName));
log.error(errorMsg); log.error(errorMsg);
throw new StreamsException(errorMsg); throw new StreamsException(errorMsg);

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.kafka.streams.integration; package org.apache.kafka.streams.integration;
import kafka.tools.StreamsResetter; import org.apache.kafka.tools.StreamsResetter;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;

View File

@ -17,7 +17,7 @@
package org.apache.kafka.streams.integration; package org.apache.kafka.streams.integration;
import kafka.server.KafkaConfig$; import kafka.server.KafkaConfig$;
import kafka.tools.StreamsResetter; import org.apache.kafka.tools.StreamsResetter;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams;

View File

@ -16,7 +16,7 @@
*/ */
package org.apache.kafka.streams.tools; package org.apache.kafka.streams.tools;
import kafka.tools.StreamsResetter; import org.apache.kafka.tools.StreamsResetter;
import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;

View File

@ -522,7 +522,7 @@ class StreamsResetter(StreamsTestBaseService):
def __init__(self, test_context, kafka, topic, applicationId): def __init__(self, test_context, kafka, topic, applicationId):
super(StreamsResetter, self).__init__(test_context, super(StreamsResetter, self).__init__(test_context,
kafka, kafka,
"kafka.tools.StreamsResetter", "org.apache.kafka.tools.StreamsResetter",
"") "")
self.topic = topic self.topic = topic
self.applicationId = applicationId self.applicationId = applicationId

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package kafka.tools; package org.apache.kafka.tools;
import joptsimple.OptionException; import joptsimple.OptionException;
import joptsimple.OptionParser; import joptsimple.OptionParser;