Reputation: 365
I have an apache beam pipeline that reads from pubsub, enriches data using Redis and finally writes to pubsub. I am trying to write tests to test the enrichment Dofn which is a stateful DoFn. Here the internal state is acting as a near cache to reduce the calls to Redis. For instantiating my Redis client I am using a factory declared in PipelineOptions such as
@Default.InstanceFactory(RedisClientFactory.class)
RedisClient getRedisClient();
void setRedisClient(RedisClient client);
In theory, the above client should be a singleton for each worker. In my unit tests, I am trying to mock some stuff inside that redis client. My tests look like this -
//setup pipeline
TestStream<MetricsInstance> inputStream =
TestStream.create(...).advanceWatermarkToInfinity();
PCollection<MetricsInstance> enrichedDataStream = pipeline.apply(inputStream)
.apply(ParDo.of(new ConvertToKeyValuePairDoFn<>()))
.apply(ParDo.of(new EnrichMetricsInstanceDoFn()));
CommonPipelineOptions options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
RedisClient redisClient = options.getRedisClient();
JedisPool jedisPool = Mockito.mock(JedisPool.class);
jedis = Mockito.mock(Jedis.class);
Mockito.when(jedisPool.getResource()).thenReturn(jedis);
redisClient.setPool(jedisPool);
... some stubbing code and finally the pipeline run
PAssert.that(enrichedDataStream).containsInAnyOrder(expectedDataStream);
pipeline.run(options);
When I try to run this test I am getting an error like this
java.lang.IllegalArgumentException: Failed to serialize and deserialize property 'redisClient' with value 'xxx.xxx.RedisClientImpl@529cfee5'
To make the framework not attempt to serialize the client I can add @JsonIgnore
on the getRedisClient()
in my Options class. But that causes the Redis instance to be recreated at some point and all my mocking and stubbing is lost. I want to know whats the best way to test such scenarios.
Upvotes: 3
Views: 1705
Reputation: 365
After some discussions on the Apache Beam Mailing list, I was able to get this thing to work. The trick was to setup RedisClientFactory in a way that it uses another field from pipeline options which exposes name of RedisClient class.
so options will look like this -
@Default.Class(RedisClientImpl.class)
Class<? extends RedisClient> getRedisClientClass();
void setRedisClientClass(Class<? extends RedisClient> redisClientClass);
@Default.InstanceFactory(RedisClientFactory.class)
RedisClient getRedisClient();
void setRedisClient(RedisClient client);
Factory is implemented like this -
public class RedisClientFactory implements DefaultValueFactory<RedisClient> {
@Override
public RedisClient create(PipelineOptions options) {
CommonPipelineOptions pipelineOptions = options.as(CommonPipelineOptions.class);
return InstanceBuilder.ofType(RedisClient.class)
.fromClass(pipelineOptions.getRedisClientClass())
.fromFactoryMethod("fromOptions")
.withArg(PipelineOptions.class, options)
.build();
}
}
This factory is using a method called fromOptions
from class RedisClientImpl
to construct the client.
public static RedisClientImpl fromOptions(PipelineOptions options) {
return new RedisClientImpl(options.as(CommonPipelineOptions.class));
}
Using this setup I can now create a mock instance of RedisClient in my unit tests.
options = PipelineOptionsFactory.as(CommonPipelineOptions.class);
options.setRedisClientClass(FakeRedisClient.class);
...
// setup fake data in the FakeRedisClient by calling static methods
FakeRedisClient.keyToValueMap.put(redisKey, redisReturnVal);
...
pipeline.run(options);
We also need to make sure that FakeRedisClient class also exposes a method called fromOptions
public static FakeRedisClient fromOptions(PipelineOptions options) {
return new FakeRedisClient();
}
Upvotes: 4