Reputation: 105
I am tying to implement a custom sink, where I created a stubbed invoke function that just logs received data to the task log file (for now) as shown below.
package io.name.package;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
public class AlertSink extends RichSinkFunction<Alert> {
Logger LOG = LoggerFactory.getLogger(AlertSink.class);
@Override
public void invoke(Alert alert, Context context) throws Exception {
LOG.info("Invoking sink for alert: ", alert.toString());
}
}
I have configured data steam as shown below.
DataStream<Alert> result = filteredMetrics
.keyBy(
new KeySelector<Tuple7<String, String, String, String, String, String, Object>, Tuple3<String, String, String>>() {
@Override
public Tuple3<String, String, String> getKey(Tuple7<String, String, String, String, String, String, Object> in) throws Exception {
return Tuple3.of(in.f0, in.f1, in.f2);
}
})
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new ThresholdEvaluator());
result.addSink(new AlertSink());
When, i checked the logs, I see the sink was invoked but showing empty string. ThresholdEvaluator emits the Alert, but showing non-empty string.
2020-08-05 19:38:16,638 INFO io.name.package.AlertSink - Invoking sink for alert:
2020-08-05 19:38:16,638 INFO io.name.package.ThresholdEvaluator - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-production-graph-service-account-toke644zm","period":"5m","isActive":true,"status":"new","firstSeen":1596656296638,"lastSeen":1596656296638,"count":1}
2020-08-05 19:38:16,640 INFO io.name.package.AlertSink - Invoking sink for alert:
2020-08-05 19:38:16,640 INFO io.name.package.ThresholdEvaluator - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-staging-input-service-account-token-q57xf","period":"5m","isActive":true,"status":"new","firstSeen":1596656296640,"lastSeen":1596656296640,"count":1}
2020-08-05 19:38:16,643 INFO io.name.package.AlertSink - Invoking sink for alert:
2020-08-05 19:38:16,643 INFO io.name.package.ThresholdEvaluator - Alert: {"thresholdID":"123123123","grouping":"svc-platform-5445135-restructure-repo-cmi-service-account-k76cd","period":"5m","isActive":true,"status":"new","firstSeen":1596656296643,"lastSeen":1596656296643,"count":1}
2020-08-05 19:38:16,646 INFO io.name.package.AlertSink - Invoking sink for alert:
2020-08-05 19:38:16,646 INFO io.name.package.ThresholdEvaluator - Alert: {"thresholdID":"123123123","grouping":"svc-integrations-14361530-demo-slack-token","period":"5m","isActive":true,"status":"new","firstSeen":1596656296645,"lastSeen":1596656296645,"count":1}
Am I missing something here?
I also tried adding map function in between ThresholdEvaluator and addSink operators. MapFunction seems able to receive Alert object fine but not AlertSink.
result.map(new MapFunction<Alert, Alert>() {
@Override
public Alert map(Alert value) {
LOG.info(value.toString());
return value;
}
}).addSink(new AlertSink());
(updated with additional log)
Upvotes: 1
Views: 2349
Reputation: 1651
The reason is that the log output does not have a placeholder to interpolate the value - the right syntax is
LOG.info("Invoking sink for alert: {}", alert.toString());
The difference between this and
LOG.info("Invoking sink for alert: " + alert.toString());
is that in a latter case the string concatenation will occur every time regardless of log level, and in first case it will interpolate the value if only the log level is at least INFO
.
Upvotes: 2