NIrav Modi
NIrav Modi

Reputation: 6972

How to get state for multiple keyBy in flink using queryable state client?

I am using Flink 1.4.2 and I have one scenario in which I need to use two keys. For e.g.

KeyedStream<UsageStatistics, Tuple> keyedStream = stream.keyBy("clusterId", "ssid");
usageCounts = keyedStream.process(new CustomProcessFunction(windowSize,queryableStateName));

Value Description would

ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, SsidTotalUsage.class);
        descriptor.setQueryable(queryableStateName);

Can anyone please suggest me to get state using queryable state client for multiple keys in flink?

Below QueryableClient is working well for a single key 'clusterId'.

kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, clusterId, BasicTypeInfo.STRING_TYPE_INFO, descriptor);

What should be the type_info for multiple keys? Any suggestion/example or reference related to this will be very helpful?

Upvotes: 4

Views: 1042

Answers (1)

NIrav Modi
NIrav Modi

Reputation: 6972

I found the solution.

I have given TypeHint in valueStateDescription.

In Flink Job:

TypeInformation<SsidTotalUsage> typeInformation = TypeInformation.of(new TypeHint<SsidTotalUsage>() {});

ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);

On Client Side:

ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);

I have two key so I had used Tuple2 class and set the value of my keys like below. Note: If you have more than two keys then you have to select Tuple3, Tuple4 class according to your keys.

 Tuple2<String, String> tuple = new Tuple2<>();
 tuple.f0 = clusterId;
 tuple.f1 = ssid;

Then I have provided TypeHint.

TypeHint<Tuple2<String, String>> typeHint = new TypeHint<Tuple2<String, String>>() {};

CompletableFuture<ValueState<SsidTotalUsage>> kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, tuple, typeHint, descriptor);

In Above code, getState method will return ImmutableValueState so I need to get the my pojo like below.

ImmutableValueState<SsidTotalUsage> state = (ImmutableValueState<SsidTotalUsage>) kvState.get();

totalUsage = state.value();

Upvotes: 3

Related Questions