In case of an error while flattening a record with schema, the Flatten transformation was reporting an error about a record without schema, as follows:
```
org.apache.kafka.connect.errors.DataException: Flatten transformation does not support ARRAY for record without schemas (for field ...)
```
The expected behaviour would be an error message specifying "with schemas".
This looks like a simple copy/paste typo from the schemaless equivalent methods, in the same file
Reviewers: Ewen Cheslack-Postava <me@ewencp.org>, Konstantine Karantasis <konstantine@confluent.io>
* KAFKA-9707: Fix InsertField.Key not applying to tombstone events
* Fix typo that hardcoded .value() instead of abstract operatingValue
* Add test for Key transform that was previously not tested
Signed-off-by: Greg Harris <gregh@confluent.io>
* Add null value assertion to tombstone test
* Remove mis-named function and add test for passing-through a null-keyed record.
Signed-off-by: Greg Harris <gregh@confluent.io>
* Simplify unchanged record assertion
Signed-off-by: Greg Harris <gregh@confluent.io>
* Replace assertEquals with assertSame
Signed-off-by: Greg Harris <gregh@confluent.io>
* Fix checkstyleTest indent issue
Signed-off-by: Greg Harris <gregh@confluent.io>
Includes fixes from PR-7315 (KAFKA-8819 and KAFKA-8340), but omits ConfigProvider and Configurable test cases and plugins, and replaces Java 8 language features with suitable Java 7 features.
Signed-off-by: Greg Harris <gregh@confluent.io>
Author: Greg Harris <gregh@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
Adds support for the Connect Cast transforms to cast from Connect logical types, such as DATE, TIME, TIMESTAMP, and DECIMAL. Casting to numeric types will produce the underlying numeric value represented in the desired type. For logical types represented by underlying Java Date class, this means the milliseconds since EPOCH. For Decimal, this means the underlying value. If the value does not fit in the desired target type, it may overflow.
Casting to String from Date, Time, and Timestamp types will produce their ISO 8601 representation. Casting to String from Decimal will result in the value represented as a string. e.g. 1234 -> "1234".
Author: Nigel Liang <nigel@nigelliang.com>
Reviewer: Randall Hauch <rhauch@gmail.com>
Allow to cast LogicalType to string by calling the serialized (Java) object's toString().
Added tests for `BigDecimal` and `Date` as whole record and as fields.
Author: Amit Sela <amitsela33@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Robert Yokota <rayokota@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4820 from amitsela/cast-transform-bytes
Changed Connect's `WorkerSourceTask` to capture non-retriable exceptions from the `producer.send(...)` (e.g., authentication or authorization errors) and to fail the connector task when such an error is encountered. Modified the existing unit tests to verify this functionality.
Note that most producer errors are retriable, and Connect will (by default) set up each producer with 1 max in-flight message and infinite retries. This change only affects non-retriable errors.
Correct the Flatten SMT to properly handle null key or value `Struct` instances.
Author: Michal Borowiecki <michal.borowiecki@openbet.com>
Reviewers: Arjun Satish <arjun@confluent.io>, Robert Yokota <rayokota@gmail.com>, Randall Hauch <rhauch@gmail.com>
The debug log lines in the `Plugins` class that log header and key/value converter configurations should be altered as the configurations for these converters may contain secrets that should not be logged in plaintext. Instead, only the keys for these configs are safe to expose.
Author: Chris Egerton <cegerton@oberlin.edu>
Reviewer: Randall Hauch <rhauch@gmail.com>
Replace `headers.isEmpty()` by calls to `isEmpty()` as the latter does a null check on heathers (that is lazily created).
Author: Sebastián Ortega <sebastian.ortega@letgo.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Arjun Satish <arjunconfluent.io>, Randall Hauch <rhauch@gmail.com>
Some connector configs may be sensitive, so we should avoid logging them.
Reviewers: Alex Diachenko, Dustin Cote <dustin@confluent.io>, Jason Gustafson <jason@confluent.io>
The parent classloader of the DelegatingClassLoader and therefore the classloading scheme used by Connect does not have to be fixed to the System classloader.
Setting it the same as the one that was used to load the DelegatingClassLoader class itself is more flexible and, while in most cases will result in the System classloader to be used, it will also work in othr managed environments that control classloading differently (OSGi, and others).
The fix is minimal and the mainstream use is tested via system tests.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Various converters (AvroConverter and JsonConverter) produce a
SchemaAndValue consisting of a logical schema type and a java.util.Date.
This is a fix for SchemaProjector to properly handle the Date.
Author: Robert Yokota <rayokota@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5736 from rayokota/KAFKA-7476
(cherry picked from commit 3edd8e7333)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
Switches to normal year format instead of week date years and day of month instead of day of year.
This is directly from #4820, but separated into a different JIRA/PR to keep the fixes independent. Original authorship should be maintained in the commit.
Author: Amit Sela <amitsela33@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5718 from ewencp/fix-header-converter-date-format
(cherry picked from commit c1457be995)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
Currently logical types are dropped during Cast Transformation.
This patch fixes this behaviour.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
https://issues.apache.org/jira/browse/KAFKA-7058
* Summary of testing strategy: Added new unit test
Author: Gunnar Morling <gunnar.morling@googlemail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5225 from gunnarmorling/KAFKA-7058
(cherry picked from commit be846d833c)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
This was originally missed when headers were added as part of KIP-145 in AK 1.1. An additional unit test was added in line with the StringConverter.
This should be backported to the AK `1.1` branch so that it is included in the next bugfix release. The `SimpleHeaderConverter` class that we're referencing was first added in the 1.1.0 release, so there's no reason to backport earlier.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5204 from rhauch/kafka-7047
This is a change to improve resource cleanup for sink tasks and source tasks. Now `Task.stop()` is called from both `WorkerSinkTask.close()` and `WorkerSourceTask.close()`.
It is called from `WorkerXXXTask.close()` since this method is called in the `finally` block of `WorkerTask.run()`, and Connect developers use `stop()` to clean up resources.
Author: Robert Yokota <rayokota@gmail.com>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#5020 from rayokota/K6566-improve-connect-resource-cleanup
(cherry picked from commit ee8abb2f70)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
## Summary of the problem
When the `header.converter` is not specified in the worker config or the connector config, a bug in the `Plugins` test causes it to never instantiate the `HeaderConverter` instance, even though there is a default value.
This is a problem as soon as the connector deals with headers, either in records created by a source connector or in messages on the Kafka topics consumed by a sink connector. As soon as that happens, a NPE occurs.
A workaround is to explicitly set the `header.converter` configuration property, but this was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible and will require this configuration change.
## The Changes
The `Plugins.newHeaderConverter` methods were always returning null if the `header.converter` configuration value was not specified in the supplied connector or worker configuration. Thus, even though the `header.converter` property has a default, it was never being used.
The fix was to only check whether a `header.converter` property was specified when the connector configuration was being used, and if no such property exists in the connector configuration to return null. Then, when the worker configuration is being used, the method simply gets the `header.converter` value (or the default if no value was explicitly set).
Also, the ConnectorConfig had the same default value for the `header.converter` property as the WorkerConfig, but this resulted in very confusing log messages that implied the default header converter should be used even when the worker config specified the `header.converter` value. By removing the default, the log messages now make sense, and the Worker still properly instantiates the correct header converter.
Finally, updated comments and added log messages to make it more clear which converters are being used and how they are being converted.
## Testing
Several new unit tests for `Plugins.newHeaderConverter` were added to check the various behavior. Additionally, the runtime JAR with these changes was built and inserted into an AK 1.1 installation, and a source connector was manually tested with 8 different combinations of settings for the `header.converter` configuration:
1. default value
1. worker configuration has `header.converter` explicitly set to the default
1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin
1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
1. connector configuration has `header.converter` explicitly set to the default
1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in the same plugin
1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
1. worker configuration has `header.converter` explicitly set to the default, and the connector configuration has `header.converter` set to a custom `HeaderConverter` implementation in a _different_ plugin
The worker created the correct `HeaderConverter` implementation with the correct configuration in all of these tests.
Finally, the default configuration was used with the aforementioned custom source connector that generated records with headers, and an S3 connector that consumes the records with headers (but didn't do anything with them). This test also passed.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4815 from rhauch/kafka-6728
(cherry picked from commit d9369de8f2)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
Changed WorkerSinkTaskContext to only resume the consumer topic partitions when the connector/task is not in the paused state.
The context tracks the set of topic partitions that are explicitly paused/resumed by the connector, and when the WorkerSinkTask resumes the tasks it currently resumes all topic partitions *except* those that are still explicitly paused in the context. Therefore, the change above should result in the desired behavior.
Several debug statements were added to record when the context is called by the connector.
This can be backported to older releases, since this bug goes back to 0.10 or 0.9.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4716 from rhauch/kafka-6661
(cherry picked from commit e7ef719a5b)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
Any exception thrown by calls within a `main()` method are not logged unless explicitly done so. This change simply adds a try-catch block around most of the content of the distributed and standalone `main()` methods.
**NOTE: This should be backported to the `1.1` branch, and is currently a blocker for 1.1.**
The `connect_test.py::ConnectStandaloneFileTest.test_file_source_and_sink` system test is failing with the SASL configuration without a sufficient explanation. During the test, the Connect worker fails to start, but the Connect log contains no useful information. There are actual several things compounding to cause the failure and make it difficult to understand the problem.
First, the `tests/kafkatest/tests/connect/templates/connect_standalone.properties` is only adding in the broker's security configuration with the `producer.` and `consumer.` prefixes, but is not adding them with no prefix. The worker uses the AdminClient to connect to the broker to get the Kafka cluster ID and to manage the three internal topics, and the AdminClient is configured via top-level properties. Because the SASL test requires the clients all connect using SASL, the lack of broker security configs means the AdminClient was attempting and failing to connect to the broker. This is corrected by adding the broker's security configuration to the Connect worker configuration file at the top-level. (This was already being done in the `connect_distributed.properties` file.)
Second, the default `request.timeout.ms` for the AdminClient (and the other clients) is 120 seconds, so the AdminClient was retrying for 120 seconds before it would give up and thrown an error. However, the test was only waiting for 60 seconds before determining that the service failed to start. This can be corrected by setting `request.timeout.ms=10000` in the Connect distributed and standalone worker configurations.
Third, the Connect workers were recently changed to lookup the Kafka cluster ID before it started the herder. This is unlike the older uses of the AdminClient to find and manage the internal topics, where failure to connect was not necessarily logged correctly but nevertheless still skipped over, relying upon broker auto-topic creation to create the internal topics. (This may be why the test did not fail prior to the recent change to always require a successful AdminClient connection.) Although the worker never got this far in its startup process, the fact that we missed such an error since the prior releases means that failure to connect with the AdminClient was not being properly reported.
The `ConnectStandaloneFileTest.test_file_source_and_sink` system tests were run locally prior to this fix, and they failed as with the nightlies. Once these fixes were made, the locally run system tests passed.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <me@ewencp.org>
Closes#4610 from rhauch/kafka-6577-trunk
(cherry picked from commit fc19c3e6f2)
Signed-off-by: Damian Guy <damian.guy@gmail.com>
Enabling scans for super types in reflections is required in order to discover Connect plugins.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Sending the response code of an http request issued via `RestClient` in Connect to stdout seems like a unconventional choice.
This PR redirects the responds code with a message in the logs at DEBUG level (usually the same level as the one that the caller of `RestClient.httpRequest` uses.
This fix will also fix system tests that broke by outputting this response code to stdout.
Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Damian Guy <damian.guy@gmail.com>
Closes#4591 from kkonstantine/MINOR-Redirect-response-code-in-Connect-RestClient-to-logs-instead-of-stdout
(cherry picked from commit b79e11bb51)
Signed-off-by: Damian Guy <damian.guy@gmail.com>
https://github.com/apache/kafka/pull/4356 added `batch.size` config property to `FileStreamSourceConnector` but the property was added as required without a default in config definition (`ConfigDef`). This results in validation error during connector startup.
Unit tests were added for both `FileStreamSourceConnector` and `FileStreamSinkConnector` to avoid such issues in the future.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
This is a small change to parallelize plugin scanning. This may help in some environments where otherwise plugin scanning is slow.
Author: Robert Yokota <rayokota@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4561 from rayokota/K6503-improve-plugin-scanning
(cherry picked from commit 3af13967db)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
JsonConverter should use object equality rather than reference equality in `convertToJson`.
Reviewers: Bartlomiej Tartanus <bartektartanus@gmail.com>, Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Corrected the parsing of invalid list values. A list can only be parsed if it contains elements that have a common type, and a map can only be parsed if it contains keys with a common type and values with a common type.
Reviewers: Arjun Satish <arjun@confluent.io>, Magesh Nandakumar <magesh.n.kumar@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Jason Gustafson <jason@confluent.io>
Changed call to use the overload of ConnectSchema.validateValue() method with the field name passed in. Ensure that field in put call is not null.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
The commits for KIP-145 (KAFKA-5142) changed how the Connect workers instantiate and configure the Converters, and also added the ability to do the same for the new HeaderConverters. However, the last few commits removed the default value for the `converter.type` property for Converters and HeaderConverters, and this broke how the internal converters were being created.
This change corrects the behavior so that the `converter.type` property is always set by the worker (or by the Plugins class), which means the existing Converter implementations will not have to do this. The built-in JsonConverter, ByteArrayConverter, and StringConverter also implement HeaderConverter which implements Configurable, but the Worker and Plugins methods do not yet use the `Configurable.configure(Map)` method and instead still use the `Converter.configure(Map,boolean)`.
Several tests were modified, and a new PluginsTest was added to verify the new behavior in Plugins for instantiating and configuring the Converter and HeaderConverter instances.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4512 from rhauch/kafka-6513
(cherry picked from commit 976a3b0cc8)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
This change ensures that when sensors are created, they are unique to the metric group associated with the task that created them. Previously the sensors were being shared between task metric groups, causing incorrect metrics.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
KAFKA-3073 added topic regex support for sink connectors. The addition requires that you only specify one of topics or topics.regex settings. This is being validated in one place, but not during submission of connectors. This PR adds validation at `AbstractHerder.validateConnectorConfig` and `WorkerConnector.initialize`.
This adds a test of the new behavior to `AbstractHerderTest`.
Author: Jeff Klukas <jeff@klukas.net>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4251 from jklukas/connect-topics-validation
(cherry picked from commit eb3fef760e)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
Submitting a fail safe fix for rare IOExceptions on symbolic links.
The fix is submitted without a test case since it does seem easy to reproduce such type of failures (just having a broken symbolic link does not reproduce the issue) and it's considered pretty low risk.
If accepted, needs to be ported at least to 1.0, if not 0.11
Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4481 from kkonstantine/KAFKA-6288-Broken-symlink-interrupts-scanning-the-plugin-path
(cherry picked from commit 17aaff3606)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
**[KIP-145](https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect) has been accepted, and this PR implements KIP-145 except without the SMTs.**
Changed the Connect API and runtime to support message headers as described in [KIP-145](https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect).
The new `Header` interface defines an immutable representation of a Kafka header (key-value pair) with support for the Connect value types and schemas. This interface provides methods for easily converting between many of the built-in primitive, structured, and logical data types.
The new `Headers` interface defines an ordered collection of headers and is used to track all headers associated with a `ConnectRecord` (and thus `SourceRecord` and `SinkRecord`). This does allow multiple headers with the same key. The `Headers` contains methods for adding, removing, finding, and modifying headers. Convenience methods allow connectors and transforms to easily use and modify the headers for a record.
A new `HeaderConverter` interface is also defined to enable the Connect runtime framework to be able to serialize and deserialize headers between the in-memory representation and Kafka’s byte[] representation. A new `SimpleHeaderConverter` implementation has been added, and this serializes to strings and deserializes by inferring the schemas (`Struct` header values are serialized without the schemas, so they can only be deserialized as `Map` instances without a schema.) The `StringConverter`, `JsonConverter`, and `ByteArrayConverter` have all been extended to also be `HeaderConverter` implementations. Each connector can be configured with a different header converter, although by default the `SimpleHeaderConverter` is used to serialize header values as strings without schemas.
Unit and integration tests are added for `ConnectHeader` and `ConnectHeaders`, the two implementation classes for headers. Additional test methods are added for the methods added to the `Converter` implementations. Finally, the `ConnectRecord` object is already used heavily, so only limited tests need to be added while quite a few of the existing tests already cover the changes.
Author: Randall Hauch <rhauch@gmail.com>
Reviewers: Arjun Satish <arjun@confluent.io>, Ted Yu <yuzhihong@gmail.com>, Magesh Nandakumar <magesh.n.kumar@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4319 from rhauch/kafka-5142-b
…to check for empty connector name and illegal characters in connector name. This also fixes KAFKA-4938 by removing the check for slashes in connector name from ConnectorsResource.
Author: Ewen Cheslack-Postava <me@ewencp.org>
Author: Soenke Liebau <soenke.liebau@opencore.com>
Reviewers: Gwen Shapira <cshapi@gmail.com>, Viktor Somogyi <viktor.somogyi@cloudera.com>, Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes#2755 from soenkeliebau/KAFKA-4930
This PR implements the JIRA issue [KAFKA-4029: SSL support for Connect REST API](https://issues.apache.org/jira/browse/KAFKA-4029) / [KIP-208](https://cwiki.apache.org/confluence/display/KAFKA/KIP-208%3A+Add+SSL+support+to+Kafka+Connect+REST+interface).
Summary of the main changes:
- Jetty `HttpClient` is used as HTTP client instead of the one shipped with Java. That allows to keep the SSL configuration for Server and Client be in single place (both use the Jetty `SslContextFactory`). It also has much richer configuration than the JDK client (it is easier to configure things such as supported cipher suites etc.).
- The `RestServer` class has been broker into 3 parts. `RestServer` contains the server it self. `RestClient` contains the HTTP client used for forwarding requests etc. and `SSLUtils` contain some helper classes for configuring SSL. One of the reasons for this was Findbugs complaining about the class complexity.
- A new method `valuesWithPrefixAllOrNothing` has been added to `AbstractConfig` to make it easier to handle the situation that we want to use either only the prefixed SSL options or only the non-prefixed. But not mixed them.
Author: Jakub Scholz <www@scholzj.com>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4429 from scholzj/kip-208
Exclusion for packages that need not be loaded in isolation needs to be extended to all the `org.apache.kafka` packages (that do not belong to transforms and the other whitelisted packages). Most notably, this refers to any classes in `kafka-clients` package.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
Adds alphanumeric ordering of dependencies as they added to a Connect plugin's class loader path.
This makes the layout of the dependencies consistent across systems and deployments. Dependencies should still, in principle, not include conflicts and ideally order should not matter.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
`loadClass` needs to be synchronized to protect subsequent calls to `defineClass`.
Details in the javadoc of this PR as well as here too: https://docs.oracle.com/javase/7/docs/technotes/guides/lang/cl-mt.html
/cc ewencp rhauch
Author: Konstantine Karantasis <konstantine@confluent.io>
Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
Closes#4428 from kkonstantine/KAFKA-6277-Make-loadClass-thread-safe-for-class-loaders-of-Connect-plugins
Making clear that implementations of poll() shouldn't block indefinitely in order to allow the task instance to transition to PAUSED state.
Reviewers: Randall Hauch <rhauch@gmail.com>, Jason Gustafson <jason@confluent.io>
When using Kafka Connect with a cluster that doesn't allow the user to
create topics (due to ACL configuration), Connect fails when trying to
create its internal topics even if these topics already exist. This is
incorrect behavior according to the documentation, which mentions that
R/W access should be enough.
This happens specifically when using Aiven Kafka, which does not permit
creation of topics via the Kafka Admin Client API.
The patch ignores the returned error, similar to the behavior for older
brokers that don't support the API.
* Implement MockAdminClient.deleteTopics
* Use MockAdminClient instead of MockKafkaAdminClientEnv in StreamsResetterTest
* Rename MockKafkaAdminClientEnv to AdminClientUnitTestEnv
* Use MockAdminClient instead of MockKafkaAdminClientEnv in TopicAdminTest
* Rename KafkaAdminClient to AdminClientUnitTestEnv in KafkaAdminClientTest.java
* Migrate StreamThreadTest to MockAdminClient
* Fix style errors
* Address review comments
* Fix MockAdminClient call
Reviewers: Matthias J. Sax <matthias@confluent.io>, Konstantine Karantasis <konstantine@confluent.io>, Guozhang Wang <wangguoz@gmail.com>