gaurav
gaurav

Reputation: 365

Unit tests apache beam stateful pipeline with external dependencies

I have an apache beam pipeline that reads from pubsub, enriches data using Redis and finally writes to pubsub. I am trying to write tests to test the enrichment Dofn which is a stateful DoFn. Here the internal state is acting as a near cache to reduce the calls to Redis. For instantiating my Redis client I am using a factory declared in PipelineOptions such as

@Default.InstanceFactory(RedisClientFactory.class)
RedisClient getRedisClient();

void setRedisClient(RedisClient client);

In theory, the above client should be a singleton for each worker. In my unit tests, I am trying to mock some stuff inside that redis client. My tests look like this -

//setup pipeline
TestStream<MetricsInstance> inputStream =
        TestStream.create(...).advanceWatermarkToInfinity();
PCollection<MetricsInstance> enrichedDataStream  = pipeline.apply(inputStream)
.apply(ParDo.of(new ConvertToKeyValuePairDoFn<>()))
.apply(ParDo.of(new EnrichMetricsInstanceDoFn()));


CommonPipelineOptions options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
RedisClient redisClient = options.getRedisClient();
JedisPool jedisPool = Mockito.mock(JedisPool.class);
jedis = Mockito.mock(Jedis.class);
Mockito.when(jedisPool.getResource()).thenReturn(jedis);
redisClient.setPool(jedisPool);
... some stubbing code and finally the pipeline run
PAssert.that(enrichedDataStream).containsInAnyOrder(expectedDataStream);
pipeline.run(options);

When I try to run this test I am getting an error like this

java.lang.IllegalArgumentException: Failed to serialize and deserialize property 'redisClient' with value 'xxx.xxx.RedisClientImpl@529cfee5'

To make the framework not attempt to serialize the client I can add @JsonIgnore on the getRedisClient() in my Options class. But that causes the Redis instance to be recreated at some point and all my mocking and stubbing is lost. I want to know whats the best way to test such scenarios.

Upvotes: 3

Views: 1705

Answers (1)

gaurav
gaurav

Reputation: 365

After some discussions on the Apache Beam Mailing list, I was able to get this thing to work. The trick was to setup RedisClientFactory in a way that it uses another field from pipeline options which exposes name of RedisClient class.

so options will look like this -

    @Default.Class(RedisClientImpl.class)
    Class<? extends RedisClient> getRedisClientClass();

    void setRedisClientClass(Class<? extends RedisClient> redisClientClass);

    @Default.InstanceFactory(RedisClientFactory.class)
    RedisClient getRedisClient();

    void setRedisClient(RedisClient client);

Factory is implemented like this -

public class RedisClientFactory implements DefaultValueFactory<RedisClient> {
  @Override
  public RedisClient create(PipelineOptions options) {

    CommonPipelineOptions pipelineOptions = options.as(CommonPipelineOptions.class);
    return InstanceBuilder.ofType(RedisClient.class)
        .fromClass(pipelineOptions.getRedisClientClass())
        .fromFactoryMethod("fromOptions")
        .withArg(PipelineOptions.class, options)
        .build();
  }

}

This factory is using a method called fromOptions from class RedisClientImpl to construct the client.

  public static RedisClientImpl fromOptions(PipelineOptions options) {
    return new RedisClientImpl(options.as(CommonPipelineOptions.class));
  }

Using this setup I can now create a mock instance of RedisClient in my unit tests.

options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
options.setRedisClientClass(FakeRedisClient.class);
...
// setup fake data in the FakeRedisClient by calling static methods
FakeRedisClient.keyToValueMap.put(redisKey, redisReturnVal);
...
pipeline.run(options);

We also need to make sure that FakeRedisClient class also exposes a method called fromOptions

  public static FakeRedisClient fromOptions(PipelineOptions options) {
    return new FakeRedisClient();
  }

Upvotes: 4

Related Questions