Reputation: 21
I'm writing a SinkConnector in Kafka Connect and hitting an issue. This connector has a configuration as such :
{
"connector.class" : "a.b.ExampleFileSinkConnector",
"tasks.max" : '1',
"topics" : "mytopic",
"maxFileSize" : "50"
}
I define the connector's config like this :
@Override public ConfigDef config()
{
ConfigDef result = new ConfigDef();
result.define("maxFileSize", Type.STRING, "10", Importance.HIGH, "size of file");
return result;
}
In the connector, I start the tasks as such :
@Override public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> result = new ArrayList<Map<String,String>>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskConfig = new HashMap<>();
taskConfig.put("connectorName", connectorName);
taskConfig.put("taskNumber", Integer.toString(i));
taskConfig.put("maxFileSize", maxFileSize);
result.add(taskConfig);
}
return result;
}
and all goes well.
However, when starting the Task (in taskConfigs()), if I add this :
taskConfig.put("epoch", "123");
this breaks the whole infrastructure : all connectors are stopped and restarted in an endless loop.
There is no exception or error whatsoever in the connect log file that can help.
The only way to make it work is to add "epoch" in the connector config, which I don't want to do since it is an internal parameter that the connector has to send to the task. It is not intended to be exposed to the connector's users.
Another point I noticed is that it is not possible to update the value of any connector config parameter, apart to set it to the default value. Changing a parameter and sending it to the task produces the same behavior.
I would really appreciate any help on this issue.
EDIT : here is the code of SinkTask::start()
@Override public void start(Map<String, String> taskConfig) {
try {
connectorName = taskConfig.get("connectorName");
log.info("{} -- Task.start()", connectorName);
fileNamePattern = taskConfig.get("fileNamePattern");
rootDir = taskConfig.get("rootDir");
fileExtension = taskConfig.get("fileExtension");
maxFileSize = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxFileSize"));
maxTimeMinutes = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxTimeMinutes"));
maxNumRecords = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("maxNumRecords"));
taskNumber = SimpleFileSinkConnector.parseIntegerConfig(taskConfig.get("taskNumber"));
epochStart = SimpleFileSinkConnector.parseLongConfig(taskConfig.get("epochStart"));
log.info("{} -- fileNamePattern: {}, rootDir: {}, fileExtension: {}, maxFileSize: {}, maxTimeMinutes: {}, maxNumRecords: {}, taskNumber: {}, epochStart : {}",
connectorName, fileNamePattern, rootDir, fileExtension, maxFileSize, maxTimeMinutes, maxNumRecords, taskNumber, epochStart);
if (taskNumber == 0) {
checkTempFilesForPromotion();
}
computeInitialFilename();
log.info("{} -- Task.start() END", connectorName);
} catch (Exception e) {
log.info("{} -- Task.start() EXCEPTION : {}", connectorName, e.getLocalizedMessage());
}
}
Upvotes: 1
Views: 877
Reputation: 21
We found the root cause of the issue. The Kafka Connect Framework is actually behaving as designed - the problem has to do with how we are trying to use the taskConfigs configuration framework.
The Problem
In our design, the FileSinkConnector sets an epoch in its start() lifecycle method, and this epoch is passed down to its tasks by way of the taskConfigs() lifecycle method. So each time the Connector's start() lifecycle method runs, different configuration is generated for the tasks - which is the problem.
Generating different configuration each time is a no-no. It turns out that the Connect Framework detects differences in configuration and will restart/rebalance upon detection - stopping and restarting the connector/task. That restart will call the stop() and start() methods of the connector ... which will (of course) produces yet another configuration change (because of the new epoch), and the vicious cycle is on!
This was an interesting and unexpected issue ... due to a behavior in Connect that we had no appreciation for. This is the first time we tried to generate task configuration that was not a simple function of the connector configuration.
Note that this behavior in Connect is intentional and addresses real issues of dynamically-changing configuration - like a JDBC Sink Connector that spontaneously updates its configuration when it detects a new database table it wants to sink.
Thanks to those who helped us !
Upvotes: 1