Reputation: 6972
I am using Flink 1.4.2 and I have one scenario in which I need to use two keys. For e.g.
KeyedStream<UsageStatistics, Tuple> keyedStream = stream.keyBy("clusterId", "ssid");
usageCounts = keyedStream.process(new CustomProcessFunction(windowSize,queryableStateName));
Value Description would
ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, SsidTotalUsage.class);
descriptor.setQueryable(queryableStateName);
Can anyone please suggest me to get state using queryable state client for multiple keys in flink?
Below QueryableClient is working well for a single key 'clusterId'.
kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, clusterId, BasicTypeInfo.STRING_TYPE_INFO, descriptor);
What should be the type_info for multiple keys? Any suggestion/example or reference related to this will be very helpful?
Upvotes: 4
Views: 1042
Reputation: 6972
I found the solution.
I have given TypeHint in valueStateDescription.
In Flink Job:
TypeInformation<SsidTotalUsage> typeInformation = TypeInformation.of(new TypeHint<SsidTotalUsage>() {});
ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);
On Client Side:
ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);
I have two key so I had used Tuple2 class and set the value of my keys like below. Note: If you have more than two keys then you have to select Tuple3, Tuple4 class according to your keys.
Tuple2<String, String> tuple = new Tuple2<>();
tuple.f0 = clusterId;
tuple.f1 = ssid;
Then I have provided TypeHint.
TypeHint<Tuple2<String, String>> typeHint = new TypeHint<Tuple2<String, String>>() {};
CompletableFuture<ValueState<SsidTotalUsage>> kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, tuple, typeHint, descriptor);
In Above code, getState method will return ImmutableValueState so I need to get the my pojo like below.
ImmutableValueState<SsidTotalUsage> state = (ImmutableValueState<SsidTotalUsage>) kvState.get();
totalUsage = state.value();
Upvotes: 3