Reputation:
I have a simple proof of concept for a Flink job. It basically receives messages (in JSON format) from a Kafka topic, deserialize them into a domain model, validate these against some predefined set of rules, applies some transformations, and finally publishes a resultant message into a Kafka sink.
I do have several functions/operators that use some behavior from other "service" classes. Those "service" classes could import some other dependencies as well.
As far as I know, Flink will try to (de)serialize those functions/operators in order to make the entire job truly distributed. I'm not clear if Flink would automatically avoid that by using transient
on those fields/members or if it would be enough to declare them as static
to avoid that.
This is an example of what I have:
public final class SomeFlatMapFunction implements FlatMapFunction<SomeMessage, Some> {
private static final long serialVersionUID = -5810858761065889162L;
private static final SomeMapper MAPPER = SomeMapper.INSTANCE;
private static final Validator VALIDATOR = Validator.INSTANCE;
@Override
public void flatMap(final SomeMessage value, final Collector<Some> out) {
final var result = MAPPER.valueFrom(value);
final var violations = VALIDATOR.getValidator().validate(result);
if (violations.isEmpty()) {
out.collect(result);
}
}
}
I haven't seen any issues with this so far, but I'm just running the application locally. What's the best/accepted approach here, even for those case where one could have to inject those dependencies in the function's constructor? It looks very much that maintaining state between those functions is highly discouraged as well.
Upvotes: 1
Views: 1130
Reputation: 5059
Operators do get serialized and deserialized,
that's why there are several Rich*
versions of the operators with open
and close
methods,
they can be used to set up things after deserialization,
once the operator is already in the task manager where it will run.
Flink will respect Java's usual serialization rules and will not serialize static
or transient
members.
In my experience, injecting domain classes in operators' constructors isn't a problem. Where you need to be careful is with domain classes that go through the network while the job is running, what sometimes is referred to as Data Transfer Objects. For those, the simplest thing is to implement them as POJOs, where 2 things are critical:
The second is particularly important if such POJOs will be part of your application's state, i.e. if you are using Flink's managed state API.
And somethig you already considered:
adding serialVersionUID
is also a good idea.
Upvotes: 1