Reputation: 13
Confluent platform add sink connector image
Trying add sink connector in confluent platform it give me errors
The connector install manually -- confluent-hub install confluentinc/kafka-connect-jdbc:10.7.6
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "10.7.6"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "10.7.6"
}
source connector working properly , confluent UI did not give any error.
I am using locally Confluent 10.7.4.--confluent version v3.2.1
Java version-- openjdk 11.0.22 2024-01-16 OpenJDK Runtime Environment OpenLogic-OpenJDK (build 11.0.22+7-adhoc.admin.jdk11u) OpenJDK 64-Bit Server VM OpenLogic-OpenJDK (build 11.0.22+7-adhoc.admin.jdk11u, mixed mode)
Error in Connect log
ERROR Uncaught exception in REST call to /connector plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:64)
java.util.concurrent.ExecutionException: java.lang.ExceptionInInitializerError
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:123)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:115)
at org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource.validateConfigs(ConnectorPluginsResource.java:109)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:134)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:177)
at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$TypeOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:219)
at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:81)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397)
at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:358)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:554)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:516)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ExceptionInInitializerError
at io.confluent.connect.jdbc.JdbcSinkConnector.config(JdbcSinkConnector.java:70)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:580)
at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$6(AbstractHerder.java:470)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value insert for configuration insert.mode: Invalid enumerator
at io.confluent.connect.jdbc.sink.JdbcSinkConfig$EnumValidator.ensureValid(JdbcSinkConfig.java:658)
at org.apache.kafka.common.config.ConfigDef$ConfigKey.<init>(ConfigDef.java:1270)
at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:159)
at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:179)
at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:218)
at io.confluent.connect.jdbc.sink.JdbcSinkConfig.<clinit>(JdbcSinkConfig.java:362)
... 8 more
What is the problem ?
Upvotes: 1
Views: 188
Reputation: 64969
This is a bug in the Confluent JDBC Connector. It fails to initialise itself if it is being run in the Turkish locale.
The bug is in this method:
public static <E> EnumValidator in(E[] enumerators) {
final List<String> canonicalValues = new ArrayList<>(enumerators.length);
final Set<String> validValues = new HashSet<>(enumerators.length * 2);
for (E e : enumerators) {
canonicalValues.add(e.toString().toLowerCase());
validValues.add(e.toString().toUpperCase());
validValues.add(e.toString().toLowerCase());
}
return new EnumValidator(canonicalValues, validValues);
}
The problem is that the calls to .toLowerCase()
and .toUpperCase()
do not specify the locale in which they are performing case conversion. So the conversion happens in the default locale. For the insert.mode
configuration default, the valid values are those that are the names of the InsertMode
enum (INSERT
, UPSERT
and UPDATE
), plus the lower-case values of these. In the Turkish locale, these are ınsert
, upsert
and update
.
The JdbcSinkConfig
class contains some functionality to check that all default values for configuration are valid. The insert.mode
parameter is one of the configuration parameters whose value is checked. The default value for insert.mode
is insert
. Of course, in the Turkish locale, this isn't one of the valid values, hence an exception is thrown.
If it is an option for you to use an English locale instead of a Turkish locale, then I would expect things to work. If not, you are waiting on Confluent to fix this bug. I've reported this bug on their GitHub repo here.
Upvotes: 1