KAFKA-15445: Add JVM Docker image (#14552)

This PR aims to add Apache Kafka JVM Docker image as per the following KIP - https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka

Reviewers:  Ismael Juma <ismael@juma.me.uk>, Ashwin Pankaj <apankaj@confluent.io>, Manikumar Reddy <manikumar.reddy@gmail.com>, Sanjay Awatramani <sawatramani@confluent.io>, 
Nikita Konev
This commit is contained in:
Vedarth Sharma 2023-12-06 15:59:13 +05:30 committed by GitHub
parent 83110e2d42
commit eec1530da0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 2376 additions and 1 deletions

View File

@ -0,0 +1,66 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: Docker Build Test
on:
workflow_dispatch:
inputs:
image_type:
type: choice
description: Docker image type to build and test
options:
- "jvm"
kafka_url:
description: Kafka url to be used to build the docker image
required: true
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
uses: actions/setup-python@v3
with:
python-version: "3.10"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r docker/requirements.txt
- name: Build image and run tests
working-directory: ./docker
run: |
python docker_build_test.py kafka/test -tag=test -type=${{ github.event.inputs.image_type }} -u=${{ github.event.inputs.kafka_url }}
- name: Run CVE scan
uses: aquasecurity/trivy-action@master
with:
image-ref: 'kafka/test:test'
format: 'table'
severity: 'CRITICAL,HIGH'
output: scan_report_${{ github.event.inputs.image_type }}.txt
exit-code: '1'
- name: Upload test report
if: always()
uses: actions/upload-artifact@v3
with:
name: report_${{ github.event.inputs.image_type }}.html
path: docker/test/report_${{ github.event.inputs.image_type }}.html
- name: Upload CVE scan report
if: always()
uses: actions/upload-artifact@v3
with:
name: scan_report_${{ github.event.inputs.image_type }}.txt
path: scan_report_${{ github.event.inputs.image_type }}.txt

3
.gitignore vendored
View File

@ -59,3 +59,6 @@ jmh-benchmarks/src/main/generated
**/src/generated-test
storage/kafka-tiered-storage/
docker/test/report_*.html
__pycache__

View File

@ -207,7 +207,9 @@ if (repo != null) {
'streams/streams-scala/logs/*',
'licenses/*',
'**/generated/**',
'clients/src/test/resources/serializedData/*'
'clients/src/test/resources/serializedData/*',
'docker/resources/utility/go.sum',
'docker/test/fixtures/secrets/*'
])
}
} else {

102
docker/README.md Normal file
View File

@ -0,0 +1,102 @@
Docker Images
=============
This directory contains scripts to build, test, push and promote docker image for kafka.
Local Setup
-----------
Make sure you have python (>= 3.7.x) and java (>= 17) (java needed only for running tests) installed before running the tests and scripts.
Run `pip install -r requirements.txt` to get all the requirements for running the scripts.
Make sure you have docker installed with support for buildx enabled. (For pushing multi-architecture image to docker registry)
Bulding image and running tests locally
---------------------------------------
- `docker_build_test.py` script builds and tests the docker image.
- kafka binary tarball url along with image name, tag and type is needed to build the image. For detailed usage description check `python docker_build_test.py --help`.
- Sanity tests for the docker image are present in test/docker_sanity_test.py.
- By default image will be built and tested, but if you only want to build the image, pass `--build` (or `-b`) flag and if you only want to test the given image pass `--test` (or `-t`) flag.
- An html test report will be generated after the tests are executed containing the results.
Example command:-
To build and test an image named test under kafka namespace with 3.6.0 tag and jvm image type ensuring kafka to be containerised should be https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz (it is recommended to use scala 2.13 binary tarball), following command can be used
```
python docker_build_test.py kafka/test --image-tag=3.6.0 --image-type=jvm --kafka-url=https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
```
Bulding image and running tests using github actions
----------------------------------------------------
This is the recommended way to build, test and get a CVE report for the docker image.
Just choose the image type and provide kafka url to `Docker Build Test` workflow. It will generate a test report and CVE report that can be shared with the community.
kafka-url - This is the url to download kafka tarball from. For example kafka tarball url from (https://archive.apache.org/dist/kafka). For building RC image this will be an RC tarball url.
image-type - This is the type of image that we intend to build. This will be dropdown menu type selection in the workflow. `jvm` image type is for official docker image (to be hosted on apache/kafka) as described in [KIP-975](https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka)
Example command:-
To build and test a jvm image type ensuring kafka to be containerised should be https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz (it is recommended to use scala 2.13 binary tarball), following inputs in github actions workflow are recommended.
```
image_type: jvm
kafka_url: https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
```
Creating a release
------------------
- `docker_release.py` script builds a multi-architecture image and pushes it to provided docker registry.
- Ensure you are logged in to the docker registry before triggering the script.
- kafka binary tarball url along with image name (in the format `<registry>/<namespace>/<image_name>:<image_tag>`) and type is needed to build the image. For detailed usage description check `python docker_release.py --help`.
Example command:-
To push an image named test under kafka dockerhub namespace with 3.6.0 tag and jvm image type ensuring kafka to be containerised should be https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz (it is recommended to use scala 2.13 binary tarball), following command can be used. (Make sure you have push access to the docker repo)
```
# kafka/test is an example repo. Please replace with the docker hub repo you have push access to.
python docker_release.py kafka/test:3.6.0 --kafka-url https://archive.apache.org/dist/kafka/3.6.0/kafka_2.13-3.6.0.tgz
```
Please note that we use docker buildx for preparing the multi-architecture image and pushing it to docker registry. It's possible to encounter build failures because of buildx. Please retry the command in case some buildx related error occurs.
Promoting a release
-------------------
`docker_promote.py` provides an interactive way to pull an RC Docker image and promote it to required dockerhub repo.
Using the image in a docker container
-------------------------------------
- The image uses the kafka downloaded from provided kafka url
- The image can be run in a container in default mode by running
`docker run -p 9092:9092 <image-name:tag>`
- Default configs run kafka in kraft mode with plaintext listners on 9092 port.
- Once user provided config properties are provided default configs will get replaced.
- User can provide kafka configs following two ways:-
- By mounting folder containing property files
- Mount the folder containing kafka property files to `/mnt/shared/config`
- These files will replace the default config files
- Using environment variables
- Kafka properties defined via env variables will override properties defined in file input
- If properties are provided via environment variables only, default configs will be replaced by user provided properties
- Input format for env variables:-
- Replace . with _
- Replace _ with __(double underscore)
- Replace - with ___(triple underscore)
- Prefix the result with KAFKA_
- Examples:
- For abc.def, use KAFKA_ABC_DEF
- For abc-def, use KAFKA_ABC___DEF
- For abc_def, use KAFKA_ABC__DEF
- Hence order of precedence of properties is the following:-
- Env variable (highest)
- File input
- Default configs (only when there is no user provided config)
- Any env variable that is commonly used in starting kafka(for example, CLUSTER_ID) can be supplied to docker container and it will be available when kafka starts
Steps to release docker image
-----------------------------
- Make sure you have executed `release.py` script to prepare RC tarball in apache sftp server.
- Use the RC tarball url (make sure you choose scala 2.13 version) as input kafka url to build docker image and run sanity tests.
- Trigger github actions workflow using the RC branch, provide RC tarball url as kafka url.
- This will generate test report and CVE report for docker images.
- If the reports look fine, RC docker image can be built and published.
- Execute `docker_release.py` script to build and publish RC docker image in your dockerhub account.
- Share the RC docker image, test report and CVE report with the community in RC vote email.
- Once approved and ready, take help from someone in PMC to trigger `docker_promote.py` script and promote the RC docker image to apache/kafka dockerhub repo

46
docker/common.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import subprocess
import tempfile
import os
from distutils.dir_util import copy_tree
import shutil
def execute(command):
if subprocess.run(command).returncode != 0:
raise SystemError("Failure in executing following command:- ", " ".join(command))
def get_input(message):
value = input(message)
if value == "":
raise ValueError("This field cannot be empty")
return value
def jvm_image(command):
temp_dir_path = tempfile.mkdtemp()
current_dir = os.path.dirname(os.path.realpath(__file__))
copy_tree(f"{current_dir}/jvm", f"{temp_dir_path}/jvm")
copy_tree(f"{current_dir}/resources", f"{temp_dir_path}/jvm/resources")
command = command.replace("$DOCKER_FILE", f"{temp_dir_path}/jvm/Dockerfile")
command = command.replace("$DOCKER_DIR", f"{temp_dir_path}/jvm")
try:
execute(command.split())
except:
raise SystemError("Docker Image Build failed")
finally:
shutil.rmtree(temp_dir_path)

84
docker/docker_build_test.py Executable file
View File

@ -0,0 +1,84 @@
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Python script to build and test a docker image
This script is used to generate a test report
Usage:
docker_build_test.py --help
Get detailed description of each option
Example command:-
docker_build_test.py <image_name> --image-tag <image_tag> --image-type <image_type> --kafka-url <kafka_url>
This command will build an image with <image_name> as image name, <image_tag> as image_tag (it will be latest by default),
<image_type> as image type (jvm by default), <kafka_url> for the kafka inside the image and run tests on the image.
-b can be passed as additional argument if you just want to build the image.
-t can be passed if you just want to run tests on the image.
"""
from datetime import date
import argparse
from distutils.dir_util import copy_tree
import shutil
from test.docker_sanity_test import run_tests
from common import execute, jvm_image
import tempfile
import os
def build_jvm(image, tag, kafka_url):
image = f'{image}:{tag}'
jvm_image(f"docker build -f $DOCKER_FILE -t {image} --build-arg kafka_url={kafka_url} --build-arg build_date={date.today()} $DOCKER_DIR")
def run_jvm_tests(image, tag, kafka_url):
temp_dir_path = tempfile.mkdtemp()
try:
current_dir = os.path.dirname(os.path.realpath(__file__))
copy_tree(f"{current_dir}/test/fixtures", f"{temp_dir_path}/fixtures")
execute(["wget", "-nv", "-O", f"{temp_dir_path}/kafka.tgz", kafka_url])
execute(["mkdir", f"{temp_dir_path}/fixtures/kafka"])
execute(["tar", "xfz", f"{temp_dir_path}/kafka.tgz", "-C", f"{temp_dir_path}/fixtures/kafka", "--strip-components", "1"])
failure_count = run_tests(f"{image}:{tag}", "jvm", temp_dir_path)
except:
raise SystemError("Failed to run the tests")
finally:
shutil.rmtree(temp_dir_path)
test_report_location_text = f"To view test report please check {current_dir}/test/report_jvm.html"
if failure_count != 0:
raise SystemError(f"{failure_count} tests have failed. {test_report_location_text}")
else:
print(f"All tests passed successfully. {test_report_location_text}")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("image", help="Image name that you want to keep for the Docker image")
parser.add_argument("--image-tag", "-tag", default="latest", dest="tag", help="Image tag that you want to add to the image")
parser.add_argument("--image-type", "-type", choices=["jvm"], default="jvm", dest="image_type", help="Image type you want to build")
parser.add_argument("--kafka-url", "-u", dest="kafka_url", help="Kafka url to be used to download kafka binary tarball in the docker image")
parser.add_argument("--build", "-b", action="store_true", dest="build_only", default=False, help="Only build the image, don't run tests")
parser.add_argument("--test", "-t", action="store_true", dest="test_only", default=False, help="Only run the tests, don't build the image")
args = parser.parse_args()
if args.image_type == "jvm" and (args.build_only or not (args.build_only or args.test_only)):
if args.kafka_url:
build_jvm(args.image, args.tag, args.kafka_url)
else:
raise ValueError("--kafka-url is a required argument for jvm image")
if args.image_type == "jvm" and (args.test_only or not (args.build_only or args.test_only)):
run_jvm_tests(args.image, args.tag, args.kafka_url)

85
docker/docker_promote.py Executable file
View File

@ -0,0 +1,85 @@
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Python script to promote an rc image.
Follow the interactive guide to pull an RC image and promote it desired dockerhub repository.
Usage: docker_promote.py
Interactive utility to promote a docker image
"""
import requests
from getpass import getpass
from common import execute, get_input
def login():
execute(["docker", "login"])
def pull(rc_image, promotion_image):
execute(["docker", "pull", "--platform=linux/amd64", rc_image])
execute(["docker", "tag", rc_image, f"{promotion_image}-amd64"])
execute(["docker", "pull", "--platform=linux/arm64", rc_image])
execute(["docker", "tag", rc_image, f"{promotion_image}-arm64"])
def push(promotion_image):
execute(["docker", "push", f"{promotion_image}-amd64"])
execute(["docker", "push", f"{promotion_image}-arm64"])
def push_manifest(promotion_image):
execute(["docker", "manifest", "create", promotion_image,
"--amend", f"{promotion_image}-amd64",
"--amend", f"{promotion_image}-arm64"])
execute(["docker", "manifest", "push", promotion_image])
def remove(promotion_image_namespace, promotion_image_name, promotion_image_tag, token):
if requests.delete(f"https://hub.docker.com/v2/repositories/{promotion_image_namespace}/{promotion_image_name}/tags/{promotion_image_tag}-amd64", headers={"Authorization": f"JWT {token}"}).status_code != 204:
raise SystemError(f"Failed to delete redundant images from dockerhub. Please make sure {promotion_image_namespace}/{promotion_image_name}:{promotion_image_tag}-amd64 is removed from dockerhub")
if requests.delete(f"https://hub.docker.com/v2/repositories/{promotion_image_namespace}/{promotion_image_name}/tags/{promotion_image_tag}-arm64", headers={"Authorization": f"JWT {token}"}).status_code != 204:
raise SystemError(f"Failed to delete redundant images from dockerhub. Please make sure {promotion_image_namespace}/{promotion_image_name}:{promotion_image_tag}-arm64 is removed from dockerhub")
execute(["docker", "rmi", f"{promotion_image_namespace}/{promotion_image_name}:{promotion_image_tag}-amd64"])
execute(["docker", "rmi", f"{promotion_image_namespace}/{promotion_image_name}:{promotion_image_tag}-arm64"])
if __name__ == "__main__":
login()
username = get_input("Enter dockerhub username: ")
password = getpass("Enter dockerhub password: ")
token = (requests.post("https://hub.docker.com/v2/users/login/", json={"username": username, "password": password})).json()['token']
if len(token) == 0:
raise PermissionError("Dockerhub login failed")
rc_image = get_input("Enter the RC docker image that you want to pull (in the format <registry>/<namespace>/<image_name>:<image_tag>): ")
promotion_image_namespace = get_input("Enter the dockerhub namespace that the rc image needs to be promoted to [example: apache]: ")
promotion_image_name = get_input("Enter the dockerhub image name that the rc image needs to be promoted to [example: kafka]: ")
promotion_image_tag = get_input("Enter the dockerhub image tag that the rc image needs to be promoted to [example: latest]: ")
promotion_image = f"{promotion_image_namespace}/{promotion_image_name}:{promotion_image_tag}"
print(f"Docker image {rc_image} will be pulled and pushed to {promotion_image}")
proceed = input("Should we proceed? [y/N]: ")
if proceed == "y":
pull(rc_image, promotion_image)
push(promotion_image)
push_manifest(promotion_image)
remove(promotion_image_namespace, promotion_image_name, promotion_image_tag, token)
print("The image has been promoted successfully. The promoted image should be accessible in dockerhub")
else:
print("Image promotion aborted")

75
docker/docker_release.py Executable file
View File

@ -0,0 +1,75 @@
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Python script to build and push a multiarch docker image
This script is used to prepare and publish docker release candidate
Pre requisites:
Ensure that you are logged in the docker registry and you have access to push to that registry.
Ensure that docker buildx is enabled for you.
Usage:
docker_release.py --help
Get detailed description of argument
Example command:-
docker_release <image> --kafka-url <kafka_url> --image-type <type>
This command will build the multiarch image of type <type> (jvm by default),
named <image> using <kafka_url> to download kafka and push it to the docker image name <image> provided.
Make sure image is in the format of <registry>/<namespace>/<image_name>:<image_tag>.
"""
from datetime import date
import argparse
from common import execute, jvm_image
def build_push_jvm(image, kafka_url):
try:
create_builder()
jvm_image(f"docker buildx build -f $DOCKER_FILE --build-arg kafka_url={kafka_url} --build-arg build_date={date.today()} --push \
--platform linux/amd64,linux/arm64 --tag {image} $DOCKER_DIR")
except:
raise SystemError("Docker image push failed")
finally:
remove_builder()
def create_builder():
execute(["docker", "buildx", "create", "--name", "kafka-builder", "--use"])
def remove_builder():
execute(["docker", "buildx", "rm", "kafka-builder"])
if __name__ == "__main__":
print("\
This script will build and push docker images of apache kafka.\n \
Please ensure that image has been sanity tested before pushing the image. \n \
Please ensure you are logged in the docker registry that you are trying to push to.")
parser = argparse.ArgumentParser()
parser.add_argument("image", help="Dockerhub image that you want to push to (in the format <registry>/<namespace>/<image_name>:<image_tag>)")
parser.add_argument("--image-type", "-type", choices=["jvm"], default="jvm", dest="image_type", help="Image type you want to build")
parser.add_argument("--kafka-url", "-u", dest="kafka_url", help="Kafka url to be used to download kafka binary tarball in the docker image")
args = parser.parse_args()
print(f"Docker image of type {args.image_type} containing kafka downloaded from {args.kafka_url} will be pushed to {args.image}")
print("Building and pushing the image")
if args.image_type == "jvm":
build_push_jvm(args.image, args.kafka_url)
print(f"Image has been pushed to {args.image}")

106
docker/jvm/Dockerfile Normal file
View File

@ -0,0 +1,106 @@
###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
FROM golang:latest AS build-utility
WORKDIR /build
RUN useradd --no-log-init --create-home --shell /bin/bash appuser
COPY --chown=appuser:appuser resources/utility/ ./
# Generate utility executable for dealing with env variables
RUN go build -ldflags="-w -s" ./utility.go
USER appuser
RUN go test ./...
FROM eclipse-temurin:21-jre-alpine AS build-jsa
USER root
# Get kafka from https://archive.apache.org/dist/kafka and pass the url through build arguments
ARG kafka_url
COPY jsa_launch /etc/kafka/docker/jsa_launch
RUN set -eux ; \
apk update ; \
apk upgrade ; \
apk add --no-cache wget gcompat gpg gpg-agent procps netcat-openbsd uuidgen; \
mkdir opt/kafka; \
wget -nv -O kafka.tgz "$kafka_url"; \
wget -nv -O kafka.tgz.asc "$kafka_url.asc"; \
tar xfz kafka.tgz -C /opt/kafka --strip-components 1; \
wget -nv -O KEYS https://downloads.apache.org/kafka/KEYS; \
gpg --import KEYS; \
gpg --batch --verify kafka.tgz.asc kafka.tgz
# Generate jsa files using dynamic CDS for kafka server start command and kafka storage format command
RUN /etc/kafka/docker/jsa_launch
FROM eclipse-temurin:21-jre-alpine
# exposed ports
EXPOSE 9092
USER root
# Get kafka from https://archive.apache.org/dist/kafka and pass the url through build arguments
ARG kafka_url
ARG build_date
LABEL org.label-schema.name="kafka" \
org.label-schema.description="Apache Kafka" \
org.label-schema.build-date="${build_date}" \
org.label-schema.vcs-url="https://github.com/apache/kafka" \
maintainer="Apache Kafka"
RUN set -eux ; \
apk update ; \
apk upgrade ; \
apk add --no-cache wget gpg gpg-agent gcompat; \
mkdir opt/kafka; \
wget -nv -O kafka.tgz "$kafka_url"; \
wget -nv -O kafka.tgz.asc "$kafka_url.asc"; \
tar xfz kafka.tgz -C /opt/kafka --strip-components 1; \
wget -nv -O KEYS https://downloads.apache.org/kafka/KEYS; \
gpg --import KEYS; \
gpg --batch --verify kafka.tgz.asc kafka.tgz; \
mkdir -p /var/lib/kafka/data /etc/kafka/secrets; \
mkdir -p /etc/kafka/docker /usr/logs /mnt/shared/config; \
adduser -h /home/appuser -D --shell /bin/bash appuser; \
chown appuser:appuser -R /usr/logs /opt/kafka /mnt/shared/config; \
chown appuser:root -R /var/lib/kafka /etc/kafka/secrets /etc/kafka; \
chmod -R ug+w /etc/kafka /var/lib/kafka /etc/kafka/secrets; \
cp /opt/kafka/config/log4j.properties /etc/kafka/docker/log4j.properties; \
cp /opt/kafka/config/tools-log4j.properties /etc/kafka/docker/tools-log4j.properties; \
rm kafka.tgz kafka.tgz.asc KEYS; \
apk del wget gpg gpg-agent; \
apk cache clean;
COPY --from=build-jsa kafka.jsa /opt/kafka/kafka.jsa
COPY --from=build-jsa storage.jsa /opt/kafka/storage.jsa
COPY --chown=appuser:appuser --from=build-utility /build/utility /usr/bin
COPY --chown=appuser:appuser resources/common-scripts /etc/kafka/docker
COPY --chown=appuser:appuser launch /etc/kafka/docker/launch
USER appuser
VOLUME ["/etc/kafka/secrets", "/var/lib/kafka/data", "/mnt/shared/config"]
CMD ["/etc/kafka/docker/run"]

49
docker/jvm/jsa_launch Executable file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
KAFKA_CLUSTER_ID="5L6g3nShT-eMCtK--X86sw"
TOPIC="$(uuidgen)"
KAFKA_JVM_PERFORMANCE_OPTS="-XX:ArchiveClassesAtExit=storage.jsa" opt/kafka/bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c opt/kafka/config/kraft/server.properties
KAFKA_JVM_PERFORMANCE_OPTS="-XX:ArchiveClassesAtExit=kafka.jsa" opt/kafka/bin/kafka-server-start.sh opt/kafka/config/kraft/server.properties &
check_timeout() {
if [ $TIMEOUT -eq 0 ]; then
echo "Server startup timed out"
exit 1
fi
echo "Check will timeout in $(( TIMEOUT-- )) seconds"
sleep 1
}
opt/kafka/bin/kafka-topics.sh --create --topic $TOPIC --bootstrap-server localhost:9092
[ $? -eq 0 ] || exit 1
echo "test" | opt/kafka/bin/kafka-console-producer.sh --topic $TOPIC --bootstrap-server localhost:9092
[ $? -eq 0 ] || exit 1
opt/kafka/bin/kafka-console-consumer.sh --topic $TOPIC --from-beginning --bootstrap-server localhost:9092 --max-messages 1 --timeout-ms 20000
[ $? -eq 0 ] || exit 1
opt/kafka/bin/kafka-server-stop.sh
# Wait until jsa file is generated
TIMEOUT=20
until [ -f /kafka.jsa ]
do
check_timeout
done

60
docker/jvm/launch Executable file
View File

@ -0,0 +1,60 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Override this section from the script to include the com.sun.management.jmxremote.rmi.port property.
if [ -z "$KAFKA_JMX_OPTS" ]; then
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
# The JMX client needs to be able to connect to java.rmi.server.hostname.
# The default for bridged n/w is the bridged IP so you will only be able to connect from another docker container.
# For host n/w, this is the IP that the hostname on the host resolves to.
# If you have more than one n/w configured, hostname -i gives you all the IPs,
# the default is to pick the first IP (or network).
export KAFKA_JMX_HOSTNAME=${KAFKA_JMX_HOSTNAME:-$(hostname -i | cut -d" " -f1)}
if [ "$KAFKA_JMX_PORT" ]; then
# This ensures that the "if" section for JMX_PORT in kafka launch script does not trigger.
export JMX_PORT=$KAFKA_JMX_PORT
export KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Djava.rmi.server.hostname=$KAFKA_JMX_HOSTNAME -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT -Dcom.sun.management.jmxremote.port=$JMX_PORT"
fi
# Make a temp env variable to store user provided performance otps
if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
export TEMP_KAFKA_JVM_PERFORMANCE_OPTS=""
else
export TEMP_KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS"
fi
# We will first use CDS for storage to format storage
export KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS -XX:SharedArchiveFile=/opt/kafka/storage.jsa"
echo "===> Using provided cluster id $CLUSTER_ID ..."
# A bit of a hack to not error out if the storage is already formatted. Need storage-tool to support this
result=$(/opt/kafka/bin/kafka-storage.sh format --cluster-id=$CLUSTER_ID -c /opt/kafka/config/server.properties 2>&1) || \
echo $result | grep -i "already formatted" || \
{ echo $result && (exit 1) }
# Using temp env variable to get rid of storage CDS command
export KAFKA_JVM_PERFORMANCE_OPTS="$TEMP_KAFKA_JVM_PERFORMANCE_OPTS"
# Now we will use CDS for kafka to start kafka server
export KAFKA_JVM_PERFORMANCE_OPTS="$KAFKA_JVM_PERFORMANCE_OPTS -XX:SharedArchiveFile=/opt/kafka/kafka.jsa"
# Start kafka broker
exec /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

16
docker/requirements.txt Normal file
View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
requests
HTMLTestRunner-Python3

View File

@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -o nounset \
-o errexit
# Trace may expose passwords/credentials by printing them to stdout, so turn on with care.
if [ "${TRACE:-}" == "true" ]; then
set -o verbose \
-o xtrace
fi

130
docker/resources/common-scripts/configure vendored Executable file
View File

@ -0,0 +1,130 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# unset KAFKA_ADVERTISED_LISTENERS from ENV in KRaft mode when running as controller only
if [[ -n "${KAFKA_PROCESS_ROLES-}" ]]
then
echo "Running in KRaft mode..."
utility ensure CLUSTER_ID
if [[ $KAFKA_PROCESS_ROLES == "controller" ]]
then
if [[ -n "${KAFKA_ADVERTISED_LISTENERS-}" ]]
then
echo "KAFKA_ADVERTISED_LISTENERS is not supported on a KRaft controller."
exit 1
else
# Unset in case env variable is set with empty value
unset KAFKA_ADVERTISED_LISTENERS
fi
fi
fi
# By default, LISTENERS is derived from ADVERTISED_LISTENERS by replacing
# hosts with 0.0.0.0. This is good default as it ensures that the broker
# process listens on all ports.
if [[ -z "${KAFKA_LISTENERS-}" ]] && ( [[ -z "${KAFKA_PROCESS_ROLES-}" ]] || [[ $KAFKA_PROCESS_ROLES != "controller" ]] ) && [[ -n "${KAFKA_ADVERTISED_LISTENERS}" ]]
then
export KAFKA_LISTENERS
KAFKA_LISTENERS=$(echo "$KAFKA_ADVERTISED_LISTENERS" | sed -e 's|://[^:]*:|://0.0.0.0:|g')
fi
utility path /opt/kafka/config/ writable
# Set if ADVERTISED_LISTENERS has SSL:// or SASL_SSL:// endpoints.
if [[ -n "${KAFKA_ADVERTISED_LISTENERS-}" ]] && [[ $KAFKA_ADVERTISED_LISTENERS == *"SSL://"* ]]
then
echo "SSL is enabled."
utility ensure KAFKA_SSL_KEYSTORE_FILENAME
export KAFKA_SSL_KEYSTORE_LOCATION="/etc/kafka/secrets/$KAFKA_SSL_KEYSTORE_FILENAME"
utility path "$KAFKA_SSL_KEYSTORE_LOCATION" existence
utility ensure KAFKA_SSL_KEY_CREDENTIALS
KAFKA_SSL_KEY_CREDENTIALS_LOCATION="/etc/kafka/secrets/$KAFKA_SSL_KEY_CREDENTIALS"
utility path "$KAFKA_SSL_KEY_CREDENTIALS_LOCATION" existence
export KAFKA_SSL_KEY_PASSWORD
KAFKA_SSL_KEY_PASSWORD=$(cat "$KAFKA_SSL_KEY_CREDENTIALS_LOCATION")
utility ensure KAFKA_SSL_KEYSTORE_CREDENTIALS
KAFKA_SSL_KEYSTORE_CREDENTIALS_LOCATION="/etc/kafka/secrets/$KAFKA_SSL_KEYSTORE_CREDENTIALS"
utility path "$KAFKA_SSL_KEYSTORE_CREDENTIALS_LOCATION" existence
export KAFKA_SSL_KEYSTORE_PASSWORD
KAFKA_SSL_KEYSTORE_PASSWORD=$(cat "$KAFKA_SSL_KEYSTORE_CREDENTIALS_LOCATION")
if [[ -n "${KAFKA_SSL_CLIENT_AUTH-}" ]] && ( [[ $KAFKA_SSL_CLIENT_AUTH == *"required"* ]] || [[ $KAFKA_SSL_CLIENT_AUTH == *"requested"* ]] )
then
utility ensure KAFKA_SSL_TRUSTSTORE_FILENAME
export KAFKA_SSL_TRUSTSTORE_LOCATION="/etc/kafka/secrets/$KAFKA_SSL_TRUSTSTORE_FILENAME"
utility path "$KAFKA_SSL_TRUSTSTORE_LOCATION" existence
utility ensure KAFKA_SSL_TRUSTSTORE_CREDENTIALS
KAFKA_SSL_TRUSTSTORE_CREDENTIALS_LOCATION="/etc/kafka/secrets/$KAFKA_SSL_TRUSTSTORE_CREDENTIALS"
utility path "$KAFKA_SSL_TRUSTSTORE_CREDENTIALS_LOCATION" existence
export KAFKA_SSL_TRUSTSTORE_PASSWORD
KAFKA_SSL_TRUSTSTORE_PASSWORD=$(cat "$KAFKA_SSL_TRUSTSTORE_CREDENTIALS_LOCATION")
fi
fi
# Set if KAFKA_ADVERTISED_LISTENERS has SASL_PLAINTEXT:// or SASL_SSL:// endpoints.
if [[ -n "${KAFKA_ADVERTISED_LISTENERS-}" ]] && [[ $KAFKA_ADVERTISED_LISTENERS =~ .*SASL_.*://.* ]]
then
echo "SASL" is enabled.
utility ensure KAFKA_OPTS
if [[ ! $KAFKA_OPTS == *"java.security.auth.login.config"* ]]
then
echo "KAFKA_OPTS should contain 'java.security.auth.login.config' property."
fi
fi
if [[ -n "${KAFKA_JMX_OPTS-}" ]]
then
if [[ ! $KAFKA_JMX_OPTS == *"com.sun.management.jmxremote.rmi.port"* ]]
then
echo "KAFKA_OPTS should contain 'com.sun.management.jmxremote.rmi.port' property. It is required for accessing the JMX metrics externally."
fi
fi
# Copy the bundled log4j.properties and tools-log4j.properties. This is done to handle property modification during container restart
cp /etc/kafka/docker/log4j.properties /opt/kafka/config/log4j.properties
cp /etc/kafka/docker/tools-log4j.properties /opt/kafka/config/tools-log4j.properties
# Copy all the user provided property files through file input
cp -R /mnt/shared/config/. /opt/kafka/config/
# Check the presence of user provided kafka configs via file input
if [ -e "/mnt/shared/config/server.properties" ]
then
echo "User provided kafka configs found via file input. Any properties provided via env variables will be appended to this."
# Append configs provided via env variables.
echo -e "\n$(utility render-properties /etc/kafka/docker/kafka-propertiesSpec.json)" >> /opt/kafka/config/server.properties
else
# Create the kafka config property file using user provided environment variables.
echo -e "\n$(utility render-properties /etc/kafka/docker/kafka-propertiesSpec.json)" > /opt/kafka/config/server.properties
if grep -q '[^[:space:]]' "/opt/kafka/config/server.properties"; then
echo "User provided kafka configs found via environment variables."
fi
fi
# If no user provided kafka configs found, use default configs
if ! grep -q '[^[:space:]]' "/opt/kafka/config/server.properties"; then
echo "User provided kafka configs not found (neither via file input nor via environment variables). Falling back to default configs."
cp /opt/kafka/config/kraft/server.properties /opt/kafka/config/server.properties
fi
echo -e "\n$(utility render-template /etc/kafka/docker/kafka-log4j.properties.template)" >> /opt/kafka/config/log4j.properties
echo -e "\n$(utility render-template /etc/kafka/docker/kafka-tools-log4j.properties.template)" >> /opt/kafka/config/tools-log4j.properties

View File

@ -0,0 +1,28 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
declare -A env_defaults
env_defaults=(
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
["CLUSTER_ID"]="5L6g3nShT-eMCtK--X86sw"
)
for key in "${!env_defaults[@]}"; do
if [[ -z "${!key:-}" ]]; then
echo ${key} not set. Setting it to default value: \"${env_defaults[$key]}\"
export "$key"="${env_defaults[$key]}"
fi
done

View File

@ -0,0 +1,22 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{{ with $value := getEnv "KAFKA_LOG4J_ROOT_LOGLEVEL" "INFO" }}{{ if ne $value "INFO" }}
log4j.rootLogger={{ $value }}, stdout
{{ end }}{{ end }}
{{ $loggers := getEnv "KAFKA_LOG4J_LOGGERS" "" -}}
{{ range $k, $v := splitToMapDefaults "," "" $loggers}}
log4j.logger.{{ $k }}={{ $v -}}
{{ end }}

View File

@ -0,0 +1,22 @@
{
"prefixes": {
"KAFKA": false
},
"renamed": {
},
"excludes": [
"KAFKA_VERSION",
"KAFKA_HEAP_OPT",
"KAFKA_LOG4J_OPTS",
"KAFKA_OPTS",
"KAFKA_JMX_OPTS",
"KAFKA_JVM_PERFORMANCE_OPTS",
"KAFKA_GC_LOG_OPTS",
"KAFKA_LOG4J_ROOT_LOGLEVEL",
"KAFKA_LOG4J_LOGGERS",
"KAFKA_TOOLS_LOG4J_LOGLEVEL"
],
"defaults": {
},
"excludeWithPrefix": ""
}

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
{{ with $value := getEnv "KAFKA_TOOLS_LOG4J_LOGLEVEL" "WARN"}} {{if ne $value "WARN"}}
log4j.rootLogger={{ $value }}, stderr
{{ end }}{{ end }}

View File

@ -0,0 +1,38 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
. /etc/kafka/docker/bash-config
# Set environment values if they exist as arguments
if [ $# -ne 0 ]; then
echo "===> Overriding env params with args ..."
for var in "$@"
do
export "$var"
done
fi
echo "===> User"
id
echo "===> Setting default values of environment variables if not already set."
. /etc/kafka/docker/configureDefaults
echo "===> Configuring ..."
/etc/kafka/docker/configure
echo "===> Launching ... "
exec /etc/kafka/docker/launch

View File

@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
module ub
go 1.19
require (
github.com/spf13/cobra v1.7.0
golang.org/x/exp v0.0.0-20230419192730-864b3d6c5c2c
golang.org/x/sys v0.7.0
)
require (
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
)

View File

@ -0,0 +1,14 @@
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
golang.org/x/exp v0.0.0-20230419192730-864b3d6c5c2c h1:HDdYQYKOkvJT/Plb5HwJJywTVyUnIctjQm6XSnZ/0CY=
golang.org/x/exp v0.0.0-20230419192730-864b3d6c5c2c/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,14 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,14 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,20 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger={{ getEnv "KAFKA_LOG4J_ROOT_LOGLEVEL" "INFO" }}, stdout
{{$loggers := getEnv "KAFKA_LOG4J_LOGGERS" "" -}}
{{ range $k, $v := splitToMapDefaults "," "" $loggers}}
log4j.logger.{{ $k }}={{ $v -}}
{{ end }}

View File

@ -0,0 +1,323 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/signal"
pt "path"
"regexp"
"sort"
"strings"
"text/template"
"github.com/spf13/cobra"
"golang.org/x/exp/slices"
"golang.org/x/sys/unix"
)
type ConfigSpec struct {
Prefixes map[string]bool `json:"prefixes"`
Excludes []string `json:"excludes"`
Renamed map[string]string `json:"renamed"`
Defaults map[string]string `json:"defaults"`
ExcludeWithPrefix string `json:"excludeWithPrefix"`
}
var (
re = regexp.MustCompile("[^_]_[^_]")
ensureCmd = &cobra.Command{
Use: "ensure <environment-variable>",
Short: "checks if environment variable is set or not",
Args: cobra.ExactArgs(1),
RunE: runEnsureCmd,
}
pathCmd = &cobra.Command{
Use: "path <path-to-file> <operation>",
Short: "checks if an operation is permitted on a file",
Args: cobra.ExactArgs(2),
RunE: runPathCmd,
}
renderTemplateCmd = &cobra.Command{
Use: "render-template <path-to-template>",
Short: "renders template to stdout",
Args: cobra.ExactArgs(1),
RunE: runRenderTemplateCmd,
}
renderPropertiesCmd = &cobra.Command{
Use: "render-properties <path-to-config-spec>",
Short: "creates and renders properties to stdout using the json config spec.",
Args: cobra.ExactArgs(1),
RunE: runRenderPropertiesCmd,
}
)
func ensure(envVar string) bool {
_, found := os.LookupEnv(envVar)
return found
}
func path(filePath string, operation string) (bool, error) {
switch operation {
case "readable":
err := unix.Access(filePath, unix.R_OK)
if err != nil {
return false, err
}
return true, nil
case "executable":
info, err := os.Stat(filePath)
if err != nil {
err = fmt.Errorf("error checking executable status of file %q: %w", filePath, err)
return false, err
}
return info.Mode()&0111 != 0, nil //check whether file is executable by anyone, use 0100 to check for execution rights for owner
case "existence":
if _, err := os.Stat(filePath); err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
return true, nil
case "writable":
err := unix.Access(filePath, unix.W_OK)
if err != nil {
return false, err
}
return true, nil
default:
err := fmt.Errorf("unknown operation %q", operation)
return false, err
}
}
func renderTemplate(templateFilePath string) error {
funcs := template.FuncMap{
"getEnv": getEnvOrDefault,
"splitToMapDefaults": splitToMapDefaults,
}
t, err := template.New(pt.Base(templateFilePath)).Funcs(funcs).ParseFiles(templateFilePath)
if err != nil {
err = fmt.Errorf("error %q: %w", templateFilePath, err)
return err
}
return buildTemplate(os.Stdout, *t)
}
func buildTemplate(writer io.Writer, template template.Template) error {
err := template.Execute(writer, GetEnvironment())
if err != nil {
err = fmt.Errorf("error building template file : %w", err)
return err
}
return nil
}
func renderConfig(writer io.Writer, configSpec ConfigSpec) error {
return writeConfig(writer, buildProperties(configSpec, GetEnvironment()))
}
// ConvertKey Converts an environment variable name to a property-name according to the following rules:
// - a single underscore (_) is replaced with a .
// - a double underscore (__) is replaced with a single underscore
// - a triple underscore (___) is replaced with a dash
// Moreover, the whole string is converted to lower-case.
// The behavior of sequences of four or more underscores is undefined.
func ConvertKey(key string) string {
singleReplaced := re.ReplaceAllStringFunc(key, replaceUnderscores)
singleTripleReplaced := strings.ReplaceAll(singleReplaced, "___", "-")
return strings.ToLower(strings.ReplaceAll(singleTripleReplaced, "__", "_"))
}
// replaceUnderscores replaces every underscore '_' by a dot '.'
func replaceUnderscores(s string) string {
return strings.ReplaceAll(s, "_", ".")
}
// ListToMap splits each and entry of the kvList argument at '=' into a key/value pair and returns a map of all the k/v pair thus obtained.
// this method will only consider values in the list formatted as key=value
func ListToMap(kvList []string) map[string]string {
m := make(map[string]string, len(kvList))
for _, l := range kvList {
parts := strings.Split(l, "=")
if len(parts) == 2 {
m[parts[0]] = parts[1]
}
}
return m
}
func splitToMapDefaults(separator string, defaultValues string, value string) map[string]string {
values := KvStringToMap(defaultValues, separator)
for k, v := range KvStringToMap(value, separator) {
values[k] = v
}
return values
}
func KvStringToMap(kvString string, sep string) map[string]string {
return ListToMap(strings.Split(kvString, sep))
}
// GetEnvironment returns the current environment as a map.
func GetEnvironment() map[string]string {
return ListToMap(os.Environ())
}
// buildProperties creates a map suitable to be output as Java properties from a ConfigSpec and a map representing an environment.
func buildProperties(spec ConfigSpec, environment map[string]string) map[string]string {
config := make(map[string]string)
for key, value := range spec.Defaults {
config[key] = value
}
for envKey, envValue := range environment {
if newKey, found := spec.Renamed[envKey]; found {
config[newKey] = envValue
} else {
if !slices.Contains(spec.Excludes, envKey) && !(len(spec.ExcludeWithPrefix) > 0 && strings.HasPrefix(envKey, spec.ExcludeWithPrefix)) {
for prefix, keep := range spec.Prefixes {
if strings.HasPrefix(envKey, prefix) {
var effectiveKey string
if keep {
effectiveKey = envKey
} else {
effectiveKey = envKey[len(prefix)+1:]
}
config[ConvertKey(effectiveKey)] = envValue
}
}
}
}
}
return config
}
func writeConfig(writer io.Writer, config map[string]string) error {
// Go randomizes iterations over map by design. We sort properties by name to ease debugging:
sortedNames := make([]string, 0, len(config))
for name := range config {
sortedNames = append(sortedNames, name)
}
sort.Strings(sortedNames)
for _, n := range sortedNames {
_, err := fmt.Fprintf(writer, "%s=%s\n", n, config[n])
if err != nil {
err = fmt.Errorf("error printing configs: %w", err)
return err
}
}
return nil
}
func loadConfigSpec(path string) (ConfigSpec, error) {
var spec ConfigSpec
bytes, err := os.ReadFile(path)
if err != nil {
err = fmt.Errorf("error reading from json file %q : %w", path, err)
return spec, err
}
errParse := json.Unmarshal(bytes, &spec)
if errParse != nil {
err = fmt.Errorf("error parsing json file %q : %w", path, errParse)
return spec, err
}
return spec, nil
}
func getEnvOrDefault(envVar string, defaultValue string) string {
val := os.Getenv(envVar)
if len(val) == 0 {
return defaultValue
}
return val
}
func runEnsureCmd(_ *cobra.Command, args []string) error {
success := ensure(args[0])
if !success {
err := fmt.Errorf("environment variable %q is not set", args[0])
return err
}
return nil
}
func runPathCmd(_ *cobra.Command, args []string) error {
success, err := path(args[0], args[1])
if err != nil {
err = fmt.Errorf("error in checking operation %q on file %q: %w", args[1], args[0], err)
return err
}
if !success {
err = fmt.Errorf("operation %q on file %q is unsuccessful", args[1], args[0])
return err
}
return nil
}
func runRenderTemplateCmd(_ *cobra.Command, args []string) error {
err := renderTemplate(args[0])
if err != nil {
err = fmt.Errorf("error in rendering template %q: %w", args[0], err)
return err
}
return nil
}
func runRenderPropertiesCmd(_ *cobra.Command, args []string) error {
configSpec, err := loadConfigSpec(args[0])
if err != nil {
err = fmt.Errorf("error in loading config from file %q: %w", args[0], err)
return err
}
err = renderConfig(os.Stdout, configSpec)
if err != nil {
err = fmt.Errorf("error in building properties from file %q: %w", args[0], err)
return err
}
return nil
}
func main() {
rootCmd := &cobra.Command{
Use: "utility",
Short: "utility commands for kafka docker images",
Run: func(cmd *cobra.Command, args []string) {},
}
rootCmd.AddCommand(pathCmd)
rootCmd.AddCommand(ensureCmd)
rootCmd.AddCommand(renderTemplateCmd)
rootCmd.AddCommand(renderPropertiesCmd)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
if err := rootCmd.ExecuteContext(ctx); err != nil {
fmt.Fprintf(os.Stderr, "error in executing the command: %s", err)
os.Exit(1)
}
}

View File

@ -0,0 +1,355 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"os"
"reflect"
"testing"
)
func assertEqual(a string, b string, t *testing.T) {
if a != b {
t.Error(a + " != " + b)
}
}
func Test_ensure(t *testing.T) {
type args struct {
envVar string
}
err := os.Setenv("ENV_VAR", "value")
if err != nil {
t.Fatal("Unable to set ENV_VAR for the test")
}
tests := []struct {
name string
args args
want bool
}{
{
name: "should exist",
args: args{
envVar: "ENV_VAR",
},
want: true,
},
{
name: "should not exist",
args: args{
envVar: "RANDOM_ENV_VAR",
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := ensure(tt.args.envVar); got != tt.want {
t.Errorf("ensure() = %v, want %v", got, tt.want)
}
})
}
}
func Test_path(t *testing.T) {
type args struct {
filePath string
operation string
}
const (
sampleFile = "testResources/sampleFile"
sampleFile2 = "testResources/sampleFile2"
fileDoesNotExist = "testResources/sampleFile3"
)
err := os.Chmod(sampleFile, 0777)
if err != nil {
t.Error("Unable to set permissions for the file")
}
err = os.Chmod(sampleFile2, 0000)
if err != nil {
t.Error("Unable to set permissions for the file")
}
tests := []struct {
name string
args args
want bool
wantErr bool
}{
{
name: "file readable",
args: args{filePath: sampleFile,
operation: "readable"},
want: true,
wantErr: false,
},
{
name: "file writable",
args: args{filePath: sampleFile,
operation: "writable"},
want: true,
wantErr: false,
},
{
name: "file executable",
args: args{filePath: sampleFile,
operation: "executable"},
want: true,
wantErr: false,
},
{
name: "file existence",
args: args{filePath: sampleFile,
operation: "existence"},
want: true,
wantErr: false,
},
{
name: "file not readable",
args: args{filePath: sampleFile2,
operation: "readable"},
want: false,
wantErr: true,
},
{
name: "file not writable",
args: args{filePath: sampleFile2,
operation: "writable"},
want: false,
wantErr: true,
},
{
name: "file not executable",
args: args{filePath: sampleFile2,
operation: "executable"},
want: false,
wantErr: false,
},
{
name: "file does not exist",
args: args{filePath: fileDoesNotExist,
operation: "existence"},
want: false,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := path(tt.args.filePath, tt.args.operation)
if (err != nil) != tt.wantErr {
t.Errorf("path() error = %v, wantErr %v", err, tt.wantErr)
}
if got != tt.want {
t.Errorf("path() = %v, want %v", got, tt.want)
}
})
}
}
func Test_renderTemplate(t *testing.T) {
type args struct {
templateFilePath string
}
const (
fileExistsAndRenderable = "testResources/sampleLog4j.template"
fileDoesNotExist = "testResources/RandomFileName"
)
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "render template success",
args: args{templateFilePath: fileExistsAndRenderable},
wantErr: false,
},
{
name: "render template failure ",
args: args{templateFilePath: fileDoesNotExist},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := renderTemplate(tt.args.templateFilePath); (err != nil) != tt.wantErr {
t.Errorf("renderTemplate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func Test_convertKey(t *testing.T) {
type args struct {
key string
}
tests := []struct {
name string
args args
wantString string
}{
{
name: "Capitals",
args: args{key: "KEY"},
wantString: "key",
},
{
name: "Capitals with underscore",
args: args{key: "KEY_FOO"},
wantString: "key.foo",
},
{
name: "Capitals with double underscore",
args: args{key: "KEY__UNDERSCORE"},
wantString: "key_underscore",
},
{
name: "Capitals with double and single underscore",
args: args{key: "KEY_WITH__UNDERSCORE_AND__MORE"},
wantString: "key.with_underscore.and_more",
},
{
name: "Capitals with triple underscore",
args: args{key: "KEY___DASH"},
wantString: "key-dash",
},
{
name: "capitals with double,triple and single underscore",
args: args{key: "KEY_WITH___DASH_AND___MORE__UNDERSCORE"},
wantString: "key.with-dash.and-more_underscore",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if result := ConvertKey(tt.args.key); result != tt.wantString {
t.Errorf("ConvertKey() result = %v, wantStr %v", result, tt.wantString)
}
})
}
}
func Test_buildProperties(t *testing.T) {
type args struct {
spec ConfigSpec
environment map[string]string
}
tests := []struct {
name string
args args
want map[string]string
}{
{
name: "only defaults",
args: args{
spec: ConfigSpec{
Defaults: map[string]string{
"default.property.key": "default.property.value",
"bootstrap.servers": "unknown",
},
},
environment: map[string]string{
"PATH": "thePath",
"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092",
"KAFKA_IGNORED": "ignored",
"KAFKA_EXCLUDE_PREFIX_PROPERTY": "ignored",
},
},
want: map[string]string{"bootstrap.servers": "unknown", "default.property.key": "default.property.value"},
},
{
name: "server properties",
args: args{
spec: ConfigSpec{
Prefixes: map[string]bool{"KAFKA": false},
Excludes: []string{"KAFKA_IGNORED"},
Renamed: map[string]string{},
Defaults: map[string]string{
"default.property.key": "default.property.value",
"bootstrap.servers": "unknown",
},
ExcludeWithPrefix: "KAFKA_EXCLUDE_PREFIX_",
},
environment: map[string]string{
"PATH": "thePath",
"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092",
"KAFKA_IGNORED": "ignored",
"KAFKA_EXCLUDE_PREFIX_PROPERTY": "ignored",
},
},
want: map[string]string{"bootstrap.servers": "localhost:9092", "default.property.key": "default.property.value"},
},
{
name: "kafka properties",
args: args{
spec: ConfigSpec{
Prefixes: map[string]bool{"KAFKA": false},
Excludes: []string{"KAFKA_IGNORED"},
Renamed: map[string]string{},
Defaults: map[string]string{
"default.property.key": "default.property.value",
"bootstrap.servers": "unknown",
},
ExcludeWithPrefix: "KAFKA_EXCLUDE_PREFIX_",
},
environment: map[string]string{
"KAFKA_FOO": "foo",
"KAFKA_FOO_BAR": "bar",
"KAFKA_IGNORED": "ignored",
"KAFKA_WITH__UNDERSCORE": "with underscore",
"KAFKA_WITH__UNDERSCORE_AND_MORE": "with underscore and more",
"KAFKA_WITH___DASH": "with dash",
"KAFKA_WITH___DASH_AND_MORE": "with dash and more",
},
},
want: map[string]string{"bootstrap.servers": "unknown", "default.property.key": "default.property.value", "foo": "foo", "foo.bar": "bar", "with-dash": "with dash", "with-dash.and.more": "with dash and more", "with_underscore": "with underscore", "with_underscore.and.more": "with underscore and more"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := buildProperties(tt.args.spec, tt.args.environment); !reflect.DeepEqual(got, tt.want) {
t.Errorf("buildProperties() = %v, want %v", got, tt.want)
}
})
}
}
func Test_splitToMapDefaults(t *testing.T) {
type args struct {
separator string
defaultValues string
value string
}
tests := []struct {
name string
args args
want map[string]string
}{
{
name: "split to default",
args: args{
separator: ",",
defaultValues: "kafka=INFO,kafka.producer.async.DefaultEventHandler=DEBUG,state.change.logger=TRACE",
value: "kafka.producer.async.DefaultEventHandler=ERROR,kafka.request.logger=WARN",
},
want: map[string]string{"kafka": "INFO", "kafka.producer.async.DefaultEventHandler": "ERROR", "kafka.request.logger": "WARN", "state.change.logger": "TRACE"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := splitToMapDefaults(tt.args.separator, tt.args.defaultValues, tt.args.value); !reflect.DeepEqual(got, tt.want) {
t.Errorf("splitToMapDefaults() = %v, want %v", got, tt.want)
}
})
}
}

16
docker/test/__init__.py Normal file
View File

@ -0,0 +1,16 @@
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

45
docker/test/constants.py Normal file
View File

@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
KAFKA_TOPICS="fixtures/kafka/bin/kafka-topics.sh"
KAFKA_CONSOLE_PRODUCER="fixtures/kafka/bin/kafka-console-producer.sh"
KAFKA_CONSOLE_CONSUMER="fixtures/kafka/bin/kafka-console-consumer.sh"
KAFKA_RUN_CLASS="fixtures/kafka/bin/kafka-run-class.sh"
JVM_COMBINED_MODE_COMPOSE="fixtures/jvm/combined/docker-compose.yml"
JVM_ISOLATED_COMPOSE="fixtures/jvm/isolated/docker-compose.yml"
CLIENT_TIMEOUT=40000
SSL_FLOW_TESTS="SSL Flow Tests"
SSL_CLIENT_CONFIG="fixtures/secrets/client-ssl.properties"
SSL_TOPIC="test-topic-ssl"
FILE_INPUT_FLOW_TESTS="File Input Flow Tests"
FILE_INPUT_TOPIC="test-topic-file-input"
BROKER_RESTART_TESTS="Broker Restart Tests"
BROKER_CONTAINER="broker1"
BROKER_RESTART_TEST_TOPIC="test-topic-broker-restart"
BROKER_METRICS_TESTS="Broker Metrics Tests"
BROKER_METRICS_TEST_TOPIC="test-topic-broker-metrics"
JMX_TOOL="org.apache.kafka.tools.JmxTool"
BROKER_METRICS_HEADING='"time","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:Count","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:EventType","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:MeanRate","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:OneMinuteRate","kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:RateUnit"'
SSL_ERROR_PREFIX="SSL_ERR"
BROKER_RESTART_ERROR_PREFIX="BROKER_RESTART_ERR"
FILE_INPUT_ERROR_PREFIX="FILE_INPUT_ERR"
BROKER_METRICS_ERROR_PREFIX="BROKER_METRICS_ERR"

View File

@ -0,0 +1,238 @@
#!/usr/bin/env python
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import subprocess
from HTMLTestRunner import HTMLTestRunner
import test.constants as constants
import os
class DockerSanityTest(unittest.TestCase):
IMAGE="apache/kafka"
FIXTURES_DIR="."
def resume_container(self):
subprocess.run(["docker", "start", constants.BROKER_CONTAINER])
def stop_container(self) -> None:
subprocess.run(["docker", "stop", constants.BROKER_CONTAINER])
def update_file(self, filename, old_string, new_string):
with open(filename) as f:
s = f.read()
with open(filename, 'w') as f:
s = s.replace(old_string, new_string)
f.write(s)
def start_compose(self, filename) -> None:
self.update_file(filename, "image: {$IMAGE}", f"image: {self.IMAGE}")
self.update_file(f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}", "{$DIR}", self.FIXTURES_DIR)
subprocess.run(["docker-compose", "-f", filename, "up", "-d"])
def destroy_compose(self, filename) -> None:
subprocess.run(["docker-compose", "-f", filename, "down"])
self.update_file(filename, f"image: {self.IMAGE}", "image: {$IMAGE}")
self.update_file(f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}", self.FIXTURES_DIR, "{$DIR}")
def create_topic(self, topic, topic_config):
command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_TOPICS}", "--create", "--topic", topic]
command.extend(topic_config)
subprocess.run(command)
check_command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_TOPICS}", "--list"]
check_command.extend(topic_config)
output = subprocess.check_output(check_command)
if topic in output.decode("utf-8"):
return True
return False
def produce_message(self, topic, producer_config, key, value):
command = ["echo", f'"{key}:{value}"', "|", f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_PRODUCER}", "--topic", topic, "--property", "'parse.key=true'", "--property", "'key.separator=:'", "--timeout", f"{constants.CLIENT_TIMEOUT}"]
command.extend(producer_config)
subprocess.run(["bash", "-c", " ".join(command)])
def consume_message(self, topic, consumer_config):
command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_CONSOLE_CONSUMER}", "--topic", topic, "--property", "'print.key=true'", "--property", "'key.separator=:'", "--from-beginning", "--max-messages", "1", "--timeout-ms", f"{constants.CLIENT_TIMEOUT}"]
command.extend(consumer_config)
message = subprocess.check_output(["bash", "-c", " ".join(command)])
return message.decode("utf-8").strip()
def get_metrics(self, jmx_tool_config):
command = [f"{self.FIXTURES_DIR}/{constants.KAFKA_RUN_CLASS}", constants.JMX_TOOL]
command.extend(jmx_tool_config)
message = subprocess.check_output(["bash", "-c", " ".join(command)])
return message.decode("utf-8").strip().split()
def broker_metrics_flow(self):
print(f"Running {constants.BROKER_METRICS_TESTS}")
errors = []
try:
self.assertTrue(self.create_topic(constants.BROKER_METRICS_TEST_TOPIC, ["--bootstrap-server", "localhost:9092"]))
except AssertionError as e:
errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
return errors
jmx_tool_config = ["--one-time", "--object-name", "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", "--jmx-url", "service:jmx:rmi:///jndi/rmi://:9101/jmxrmi"]
metrics_before_message = self.get_metrics(jmx_tool_config)
try:
self.assertEqual(len(metrics_before_message), 2)
self.assertEqual(metrics_before_message[0], constants.BROKER_METRICS_HEADING)
except AssertionError as e:
errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
return errors
producer_config = ["--bootstrap-server", "localhost:9092", "--property", "client.id=host"]
self.produce_message(constants.BROKER_METRICS_TEST_TOPIC, producer_config, "key", "message")
consumer_config = ["--bootstrap-server", "localhost:9092", "--property", "auto.offset.reset=earliest"]
message = self.consume_message(constants.BROKER_METRICS_TEST_TOPIC, consumer_config)
try:
self.assertEqual(message, "key:message")
except AssertionError as e:
errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
return errors
metrics_after_message = self.get_metrics(jmx_tool_config)
try:
self.assertEqual(len(metrics_before_message), 2)
self.assertEqual(metrics_after_message[0], constants.BROKER_METRICS_HEADING)
before_metrics_data, after_metrics_data = metrics_before_message[1].split(","), metrics_after_message[1].split(",")
self.assertEqual(len(before_metrics_data), len(after_metrics_data))
for i in range(len(before_metrics_data)):
if after_metrics_data[i].replace(".", "").isnumeric():
self.assertGreaterEqual(float(after_metrics_data[i]), float(before_metrics_data[i]))
else:
self.assertEqual(after_metrics_data[i], before_metrics_data[i])
except AssertionError as e:
errors.append(constants.BROKER_METRICS_ERROR_PREFIX + str(e))
return errors
def ssl_flow(self, ssl_broker_port, test_name, test_error_prefix, topic):
print(f"Running {test_name}")
errors = []
try:
self.assertTrue(self.create_topic(topic, ["--bootstrap-server", ssl_broker_port, "--command-config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}"]))
except AssertionError as e:
errors.append(test_error_prefix + str(e))
return errors
producer_config = ["--bootstrap-server", ssl_broker_port,
"--producer.config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}"]
self.produce_message(topic, producer_config, "key", "message")
consumer_config = [
"--bootstrap-server", ssl_broker_port,
"--property", "auto.offset.reset=earliest",
"--consumer.config", f"{self.FIXTURES_DIR}/{constants.SSL_CLIENT_CONFIG}",
]
message = self.consume_message(topic, consumer_config)
try:
self.assertEqual(message, "key:message")
except AssertionError as e:
errors.append(test_error_prefix + str(e))
return errors
def broker_restart_flow(self):
print(f"Running {constants.BROKER_RESTART_TESTS}")
errors = []
try:
self.assertTrue(self.create_topic(constants.BROKER_RESTART_TEST_TOPIC, ["--bootstrap-server", "localhost:9092"]))
except AssertionError as e:
errors.append(constants.BROKER_RESTART_ERROR_PREFIX + str(e))
return errors
producer_config = ["--bootstrap-server", "localhost:9092", "--property", "client.id=host"]
self.produce_message(constants.BROKER_RESTART_TEST_TOPIC, producer_config, "key", "message")
print("Stopping Container")
self.stop_container()
print("Resuming Container")
self.resume_container()
consumer_config = ["--bootstrap-server", "localhost:9092", "--property", "auto.offset.reset=earliest"]
message = self.consume_message(constants.BROKER_RESTART_TEST_TOPIC, consumer_config)
try:
self.assertEqual(message, "key:message")
except AssertionError as e:
errors.append(constants.BROKER_RESTART_ERROR_PREFIX + str(e))
return errors
def execute(self):
total_errors = []
try:
total_errors.extend(self.broker_metrics_flow())
except Exception as e:
print(constants.BROKER_METRICS_ERROR_PREFIX, str(e))
total_errors.append(str(e))
try:
total_errors.extend(self.ssl_flow('localhost:9093', constants.SSL_FLOW_TESTS, constants.SSL_ERROR_PREFIX, constants.SSL_TOPIC))
except Exception as e:
print(constants.SSL_ERROR_PREFIX, str(e))
total_errors.append(str(e))
try:
total_errors.extend(self.ssl_flow('localhost:9094', constants.FILE_INPUT_FLOW_TESTS, constants.FILE_INPUT_ERROR_PREFIX, constants.FILE_INPUT_TOPIC))
except Exception as e:
print(constants.FILE_INPUT_ERROR_PREFIX, str(e))
total_errors.append(str(e))
try:
total_errors.extend(self.broker_restart_flow())
except Exception as e:
print(constants.BROKER_RESTART_ERROR_PREFIX, str(e))
total_errors.append(str(e))
self.assertEqual(total_errors, [])
class DockerSanityTestJVMCombinedMode(DockerSanityTest):
def setUp(self) -> None:
self.start_compose(f"{self.FIXTURES_DIR}/{constants.JVM_COMBINED_MODE_COMPOSE}")
def tearDown(self) -> None:
self.destroy_compose(f"{self.FIXTURES_DIR}/{constants.JVM_COMBINED_MODE_COMPOSE}")
def test_bed(self):
self.execute()
class DockerSanityTestJVMIsolatedMode(DockerSanityTest):
def setUp(self) -> None:
self.start_compose(f"{self.FIXTURES_DIR}/{constants.JVM_ISOLATED_COMPOSE}")
def tearDown(self) -> None:
self.destroy_compose(f"{self.FIXTURES_DIR}/{constants.JVM_ISOLATED_COMPOSE}")
def test_bed(self):
self.execute()
def run_tests(image, mode, fixtures_dir):
DockerSanityTest.IMAGE = image
DockerSanityTest.FIXTURES_DIR = fixtures_dir
test_classes_to_run = []
if mode == "jvm":
test_classes_to_run = [DockerSanityTestJVMCombinedMode, DockerSanityTestJVMIsolatedMode]
loader = unittest.TestLoader()
suites_list = []
for test_class in test_classes_to_run:
suite = loader.loadTestsFromTestCase(test_class)
suites_list.append(suite)
combined_suite = unittest.TestSuite(suites_list)
cur_directory = os.path.dirname(os.path.realpath(__file__))
outfile = open(f"{cur_directory}/report_{mode}.html", "w")
runner = HTMLTestRunner.HTMLTestRunner(
stream=outfile,
title='Test Report',
description='This demonstrates the report output.'
)
result = runner.run(combined_suite)
return result.failure_count

View File

@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
advertised.listeners=PLAINTEXT://localhost:19093,SSL://localhost:9094
controller.listener.names=CONTROLLER
group.initial.rebalance.delay.ms=0
inter.broker.listener.name=PLAINTEXT
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT
log.dirs=/tmp/kraft-combined-logs
offsets.topic.replication.factor=1
process.roles=to be overridden
ssl.client.auth=required
ssl.key.password=abcdefgh
ssl.keystore.location=/etc/kafka/secrets/kafka02.keystore.jks
ssl.keystore.password=abcdefgh
ssl.truststore.location=/etc/kafka/secrets/kafka.truststore.jks
ssl.truststore.password=abcdefgh
transaction.state.log.min.isr=1
transaction.state.log.replication.factor=1

View File

@ -0,0 +1,101 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
---
version: '2'
services:
broker1:
image: {$IMAGE}
hostname: broker1
container_name: broker1
ports:
- "9092:9092"
- "9101:9101"
- "19091:19091"
volumes:
- ../../secrets:/etc/kafka/secrets
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SSL://localhost:19091'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker1:29093'
KAFKA_LISTENERS: 'CONTROLLER://broker1:29093,PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:19091'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_SSL_KEYSTORE_FILENAME: "kafka01.keystore.jks"
KAFKA_SSL_KEYSTORE_CREDENTIALS: "kafka_keystore_creds"
KAFKA_SSL_KEY_CREDENTIALS: "kafka_ssl_key_creds"
KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.truststore.jks"
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "kafka_truststore_creds"
KAFKA_SSL_CLIENT_AUTH: "required"
broker2:
image: {$IMAGE}
hostname: broker2
container_name: broker2
ports:
- "9093:9093"
- "19092:19092"
volumes:
- ../../secrets:/etc/kafka/secrets
environment:
KAFKA_NODE_ID: 2
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:19092,SSL://localhost:9093"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '2@broker2:29093'
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:19092,SSL://0.0.0.0:9093,CONTROLLER://broker2:29093"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_SSL_KEYSTORE_FILENAME: "kafka01.keystore.jks"
KAFKA_SSL_KEYSTORE_CREDENTIALS: "kafka_keystore_creds"
KAFKA_SSL_KEY_CREDENTIALS: "kafka_ssl_key_creds"
KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.truststore.jks"
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "kafka_truststore_creds"
KAFKA_SSL_CLIENT_AUTH: "required"
broker3:
image: {$IMAGE}
hostname: broker3
container_name: broker3
ports:
- "19093:19093"
- "9094:9094"
volumes:
- ../../secrets:/etc/kafka/secrets
- ../../file-input:/mnt/shared/config
environment:
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
# Set properties absent from the file
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_QUORUM_VOTERS: '3@broker3:29093'
KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:19093,SSL://0.0.0.0:9094,CONTROLLER://broker3:29093'
# Override an existing property
KAFKA_PROCESS_ROLES: 'broker,controller'

View File

@ -0,0 +1,170 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
---
version: '2'
services:
controller1:
image: {$IMAGE}
hostname: controller1
container_name: controller1
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:29093,2@controller2:39093,3@controller3:49093'
KAFKA_LISTENERS: 'CONTROLLER://controller1:29093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'CONTROLLER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
controller2:
image: {$IMAGE}
hostname: controller2
container_name: controller2
environment:
KAFKA_NODE_ID: 2
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:29093,2@controller2:39093,3@controller3:49093'
KAFKA_LISTENERS: 'CONTROLLER://controller2:39093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'CONTROLLER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
controller3:
image: {$IMAGE}
hostname: controller3
container_name: controller3
environment:
KAFKA_NODE_ID: 3
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:29093,2@controller2:39093,3@controller3:49093'
KAFKA_LISTENERS: 'CONTROLLER://controller3:49093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'CONTROLLER'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
broker1:
image: {$IMAGE}
hostname: broker1
container_name: broker1
ports:
- "9092:9092"
- "19091:19091"
- "9101:9101"
volumes:
- ../../secrets:/etc/kafka/secrets
environment:
KAFKA_NODE_ID: 4
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,SSL:SSL,PLAINTEXT:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SSL://localhost:19091'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'broker'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:29093,2@controller2:39093,3@controller3:49093'
KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:19091'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_SSL_KEYSTORE_FILENAME: "kafka01.keystore.jks"
KAFKA_SSL_KEYSTORE_CREDENTIALS: "kafka_keystore_creds"
KAFKA_SSL_KEY_CREDENTIALS: "kafka_ssl_key_creds"
KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.truststore.jks"
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "kafka_truststore_creds"
KAFKA_SSL_CLIENT_AUTH: "required"
depends_on:
- controller1
- controller2
- controller3
broker2:
image: {$IMAGE}
hostname: broker2
container_name: broker2
ports:
- "9093:9093"
- "19092:19092"
volumes:
- ../../secrets:/etc/kafka/secrets
environment:
KAFKA_NODE_ID: 5
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://localhost:19092,SSL://localhost:9093"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_PROCESS_ROLES: 'broker'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:29093,2@controller2:39093,3@controller3:49093'
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:19092,SSL://0.0.0.0:9093"
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
KAFKA_SSL_KEYSTORE_FILENAME: "kafka01.keystore.jks"
KAFKA_SSL_KEYSTORE_CREDENTIALS: "kafka_keystore_creds"
KAFKA_SSL_KEY_CREDENTIALS: "kafka_ssl_key_creds"
KAFKA_SSL_TRUSTSTORE_FILENAME: "kafka.truststore.jks"
KAFKA_SSL_TRUSTSTORE_CREDENTIALS: "kafka_truststore_creds"
KAFKA_SSL_CLIENT_AUTH: "required"
depends_on:
- controller1
- controller2
- controller3
broker3:
image: {$IMAGE}
hostname: broker3
container_name: broker3
ports:
- "19093:19093"
- "9094:9094"
volumes:
- ../../secrets:/etc/kafka/secrets
- ../../file-input:/mnt/shared/config
environment:
CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw'
# Set a property absent from the file
KAFKA_NODE_ID: 6
# Override existing properties
KAFKA_PROCESS_ROLES: 'broker'
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:19093,SSL://0.0.0.0:9094"
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@controller1:29093,2@controller2:39093,3@controller3:49093'
depends_on:
- controller1
- controller2
- controller3

View File

@ -0,0 +1,23 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
security.protocol=SSL
ssl.truststore.location={$DIR}/fixtures/secrets/kafka.truststore.jks
ssl.truststore.password=abcdefgh
ssl.keystore.location={$DIR}/fixtures/secrets/client.keystore.jks
ssl.keystore.password=abcdefgh
ssl.key.password=abcdefgh
ssl.client.auth=required
ssl.endpoint.identification.algorithm=

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1 @@
abcdefgh

View File

@ -0,0 +1 @@
abcdefgh

View File

@ -0,0 +1 @@
abcdefgh

View File

@ -759,6 +759,10 @@ https://kafka.apache.org/KEYS
* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~%(apache_id)s/kafka-%(rc_tag)s/
<USE docker/README.md FOR STEPS TO GENERATE DOCKER IMAGE>
* Docker release artifact to be voted upon:
<USERNAME>/<IMAGE_NAME>:<IMAGE:TAG>
* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/
@ -777,6 +781,7 @@ https://kafka.apache.org/%(docs_version)s/protocol.html
* Successful Jenkins builds for the %(dev_branch)s branch:
Unit/integration tests: https://ci-builds.apache.org/job/Kafka/job/kafka/job/%(dev_branch)s/<BUILD NUMBER>/
System tests: https://jenkins.confluent.io/job/system-test-kafka/job/%(dev_branch)s/<BUILD_NUMBER>/
Docker Build Test Pipeline: https://github.com/apache/kafka/actions/runs/<RUN_NUMBER>
/**************************************