Christos Hadjinikolis
Christos Hadjinikolis

Reputation: 2158

Unit Testing Flink Functions

I am using Flink v.1.4.0.

I have implemented a module, as part of a package I am developing, whose role is to deduplicate a stream. The module is quite simple:

public class RemoveDuplicateFilter<T> extends RichFlatMapFunction<T, T> {

static final ValueStateDescriptor<Boolean> SEEN_DESCRIPTOR = new ValueStateDescriptor<>("seen", Boolean.class);
private ValueState<Boolean> seen;

@Override
public void open(Configuration configuration) {
  RuntimeContext runtimeContext = this.getRuntimeContext();
  seen = runtimeContext.getState(SEEN_DESCRIPTOR);  
}

@Override
public void flatMap(T value, Collector<T> out) throws Exception {
  Boolean hasBeenSeen = seen.value();

  if(hasBeenSeen == null || !hasBeenSeen) {
    out.collect(value);
    seen.update(true);
  }  
}

The question is: how do I test this code without having to instantiate an actual Flink ValueState? i.e. using Mockito?

I have tried a number of things but, essentially, when it comes down to calling:

RuntimeContext runtimeContext = Mockito.mock(RuntimeContext.class);
...
when(runtimeContext.getState(SEEN_DESCRIPTOR)).thenReturn(seen);

The call always fails. I have tried replacing the SEEN_DESCRIPTOR with Matchers.any() but still no luck.

Any suggestions?

Upvotes: 1

Views: 2613

Answers (1)

kkrugler
kkrugler

Reputation: 9245

You can use flinkspector to do unit-testing of functions.

Upvotes: 1

Related Questions