Reputation: 175
The following classes declare a pipeline to process data items. This currently works fine. But there are warnings in the Pipeline class that I don't know to properly fix:
public abstract class Stage<In, Out> {
public abstract DataItem<Out> processItem(DataItem<In> data) throws Exception;
}
public class DataItem<T> {
public T data;
public DataItem(T obj) {
this.data = obj;
}
}
// chain Stages together
public class Pipeline {
private List<Stage<?,?>> stages;
public Pipeline(List<Stage<?,?>> stages) {
this.stages = stages;
}
// WARNING HERE: DataItem is a raw type. References to generic type should be parameterized
public void processItem(DataItem dataItem) throws Exception {
for (Stage<?, ?> stage: stages) {
// WARNING HERE: DataItem is a raw type. References to generic type should be parameterized
dataItem = stage.processItem(dataItem);
}
}
I tried changing the processItem() declaration by adding the wildcard , but that results in a compilation error:
public void processItem(DataItem<?> dataItem) throws Exception {
for (Stage<?, ?> stage: stages) {
// COMPILATION ERROR: The method processItem(DataItem<capture#2-of ?>)
// in the type Stage<capture#2-of ?,capture#3-of ?> is not applicable
// for the arguments (DataItem<capture#4-of ?>)
dataItem = stage.processItem(dataItem);
}
}
Is there a solution to this ?
Upvotes: 3
Views: 1786
Reputation: 54659
First, the warning simply stems from the fact that Pipeline
itself is not generic. It does not carry any information about what it expects as its initial input.
My suggestion for solving this involves a few other generalizations and (things that I would consider as) improvements:
Let Stage
be an interface. When you have an abstract class that contains only abstract methods, then this should usually be an interface. There is no need to pin implementors to the specific extends Stage
when they can equivalently do extends TheirBase implements Stage
.
Don't underestimate the first point. It allows you to say that Pipeline implements Stage
. Think about the power that this brings for the users. They can assemble complex processes in a Pipeline
, and then simply use this Pipeline
as one Stage
in an even more complex process. At the heart, this is one of the core concepts of Flow-Based Programming!
Create a type-safe Builder
for Pipeline
objects.
The last point is what your original question was about. The most important method in this builder class would then be this one:
public <NextOut> PipelineBuilder<In, NextOut> add(Stage<Out, NextOut> stage)
{
stages.add(stage);
// This cast is safe as per construction of the pipeline
@SuppressWarnings("unchecked")
PipelineBuilder<In, NextOut> result =
(PipelineBuilder<In, NextOut>) this;
return result;
}
The idea is that whenever you have a builder for a pipeline <S, T>
, and append a stage that has the type <T, U>
, then the builder will become one that has the type <S, U>
.
(As you see, it contains an unchecked cast, but the cast is safe as per the construction and later usage of the builder and the pipeline)
These transitions can be written in an overly elaborate form:
private static void showTypeTransitions()
{
Stage<String, String[]> s0 = null;
Stage<String[], List<Integer>> s1 = null;
Stage<List<Integer>, Integer> s2 = null;
// Starting with a builder for a Pipeline<String, String[]>
PipelineBuilder<String, String[]> b0 = PipelineBuilder.create(s0);
// Appending a Stage<String[], List<Integer>> turns it
// into a builder for a Pipeline<String, List<Integer>>
PipelineBuilder<String, List<Integer>> b1 = b0.add(s1);
// Appending a Stage<List<Integer>, Integer> turns it
// into a builder for a Pipeline<String, Integer>
PipelineBuilder<String, Integer> b2 = b1.add(s2);
// Finally, build it
Pipeline<String, Integer> pipeline = b2.build();
}
But this is not necessary. The intention of the builder is to have a fluent interface for the construction:
Pipeline<String, Integer> pipeline = PipelineBuilder
.create(splitter)
.add(transformer)
.add(accumulator)
.build();
(Note that you could also omit the Builder
, and add a similar method to the Pipeline
class. But that would require some tweaks for the constructor etc).
Here is a MCVE showing this approach. In the showBasicUsage
method, the basic usage is shown (hence the name...). In the showHowNiceItIsToUseAnInterface
method, the pipeline that is created in the basic example is used as one Stage
of a new Pipeline
that is created.
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
interface Stage<In, Out>
{
DataItem<Out> processItem(DataItem<In> data) throws Exception;
}
class DataItem<T>
{
public T data;
public DataItem(T obj)
{
this.data = obj;
}
}
class Pipeline<In, Out> implements Stage<In, Out>
{
private List<Stage<?, ?>> stages;
Pipeline(List<Stage<?, ?>> stages)
{
this.stages = stages;
}
@Override
public DataItem<Out> processItem(DataItem<In> dataItem) throws Exception
{
DataItem<?> current = dataItem;
for (Stage<?, ?> stage : stages)
{
current = apply(stage, current);
}
// This cast is safe as per construction of the pipeline
@SuppressWarnings("unchecked")
DataItem<Out> result = (DataItem<Out>) current;
return result;
}
private <I, O> DataItem<O> apply(
Stage<I, O> stage, DataItem<?> dataItem) throws Exception
{
// This cast is safe as per construction of the pipeline
@SuppressWarnings("unchecked")
DataItem<I> typedDataItem = (DataItem<I>)dataItem;
DataItem<O> result = stage.processItem(typedDataItem);
return result;
}
}
class PipelineBuilder<In, Out>
{
private List<Stage<?, ?>> stages;
static <In, Out> PipelineBuilder<In, Out> create(Stage<In, Out> stage)
{
PipelineBuilder<In, Out> pipelineBuilder =
new PipelineBuilder<In, Out>(stage);
return pipelineBuilder;
}
private PipelineBuilder(Stage<In, Out> stage)
{
stages = new ArrayList<Stage<?,?>>();
stages.add(stage);
}
public <NextOut> PipelineBuilder<In, NextOut> add(Stage<Out, NextOut> stage)
{
stages.add(stage);
// This cast is safe as per construction of the pipeline
@SuppressWarnings("unchecked")
PipelineBuilder<In, NextOut> result =
(PipelineBuilder<In, NextOut>) this;
return result;
}
public Pipeline<In, Out> build()
{
return new Pipeline<In, Out>(stages);
}
}
public class PipelineExample
{
public static void main(String[] args) throws Exception
{
showBasicUsage();
showHowNiceItIsToUseAnInterface();
}
private static void showBasicUsage() throws Exception
{
Pipeline<String, Integer> pipeline = createExamplePipeline();
DataItem<String> in = new DataItem<>("1 35 42 2 10 5 2 3");
DataItem<Integer> out = pipeline.processItem(in);
System.out.println(out.data); // prints 100
}
private static void showHowNiceItIsToUseAnInterface() throws Exception
{
Stage<List<Integer>, String> stringCreator =
dataItem -> new DataItem<>(
dataItem.data.stream()
.map(String::valueOf)
.collect(Collectors.joining(" ")));
// Create the whole pipeline that was used in the basic usage
// example, and use it as one stage in the new pipeline:
Stage<String, Integer> pipelineAsStage = createExamplePipeline();
Pipeline<List<Integer>, Integer> pipeline = PipelineBuilder
.create(stringCreator)
.add(pipelineAsStage)
.build();
DataItem<List<Integer>> in = new DataItem<>(Arrays.asList(0,1,2,3,4));
DataItem<Integer> out = pipeline.processItem(in);
System.out.println(out.data); // prints 10
}
private static Pipeline<String, Integer> createExamplePipeline()
{
Stage<String, String[]> splitter =
dataItem -> new DataItem<>(dataItem.data.split(" "));
Stage<String[], List<Integer>> transformer =
dataItem -> new DataItem<>(Arrays.stream(dataItem.data)
.map(Integer::parseInt)
.collect(Collectors.toList()));
Stage<List<Integer>, Integer> accumulator =
dataItem -> new DataItem<>(
dataItem.data.stream().reduce(0, Integer::sum));
Pipeline<String, Integer> pipeline = PipelineBuilder
.create(splitter)
.add(transformer)
.add(accumulator)
.build();
return pipeline;
}
private static void showTypeTransitions()
{
Stage<String, String[]> s0 = null;
Stage<String[], List<Integer>> s1 = null;
Stage<List<Integer>, Integer> s2 = null;
// Starting with a builder for a Pipeline<String, String[]>
PipelineBuilder<String, String[]> b0 = PipelineBuilder.create(s0);
// Appending a Stage<String[], List<Integer>> turns it
// into a builder for a Pipeline<String, List<Integer>>
PipelineBuilder<String, List<Integer>> b1 = b0.add(s1);
// Appending a Stage<List<Integer>, Integer> turns it
// into a builder for a Pipeline<String, Integer>
PipelineBuilder<String, Integer> b2 = b1.add(s2);
// Finally, build it
Pipeline<String, Integer> pipeline = b2.build();
}
}
(The stages are taken from the answer from Kirill Simonov, just for convenience, and because they make a nice example use case)
Upvotes: 3
Reputation: 8481
You could try to use some kind of inversion of control to make the chaining easier. Add public <R> DataItem<R> process(Stage<T, R> nextStage)
method to your DataItem
class - it will take this
item, process it using processItem
method of the nextStage
and return the new item.
Then you can create a chain of methods like this:
output = input.process(stage1).process(stage2).process(stage3);
This chain represents your pipeline, so you don't need the Pipeline
class.
// I changed it to an interface to use as a functional interface in tests
public interface Stage<I, O> {
DataItem<O> processItem(DataItem<I> data) throws Exception;
}
public class DataItem<T> {
public T data;
public DataItem(T obj) {
this.data = obj;
}
public <R> DataItem<R> process(Stage<T, R> nextStage) throws Exception {
return nextStage.processItem(this);
}
}
And here is a pipeline example:
// splits the string
Stage<String, String[]> splitter = dataItem -> new DataItem<>(dataItem.data.split(" "));
// converts each string from the array to an integer
Stage<String[], List<Integer>> transformer = dataItem ->
new DataItem<>(Arrays.stream(dataItem.data)
.map(Integer::parseInt).collect(Collectors.toList()));
// sums all integers
Stage<List<Integer>, Integer> accumulator = dataItem ->
new DataItem<>(dataItem.data.stream().reduce(0, Integer::sum));
DataItem<String> in = new DataItem<>("1 35 42 2 10 5 2 3");
DataItem<Integer> out = in.process(splitter).process(transformer).process(accumulator);
System.out.println(out.data); // prints 100
Upvotes: 1
Reputation: 140484
List elements have to be homogeneous. As such, you have to have the same input and output types, if you are going to use a List
.
Instead of this, you can use the composite pattern. For example:
public interface Stage<In, Out> {
DataItem<Out> processItem(DataItem<In> data) throws Exception;
}
class CompositeStage<In, Mid, Out> implements Stage<In, Out> {
final DataItem<In, Mid> first;
final DataItem<Mid, Out> second;
CompositeStage(
DataItem<In, Mid> first,
DataItem<Mid, Out> second) {
this.first = first;
this.second = second;
}
// Getters, if you want.
DataItem<Out> processItem(DataItem<In> in) {
return second.processItem(first.processItem(in));
}
}
Now you can use this in your "pipeline"; although you don't really need that class any more.
Stage<A, B> first = ...;
Stage<B, C> second = ...;
Stage<C, D> third = ...;
Stage<A, D> overall = new CompositeStage<>(first, new CompositeStage<>(second, third));
The advantage of this is that you keep the type information about the input and output types, which are the only things that really matter from the point of view of anything using the class.
DataItem<A> input = ...;
DataItem<D> output = overall.processItem(input);
Upvotes: 1
Reputation: 11845
I see two options.
A. All the input and output parameters (of all stages) have the same type. You can add it as a type parameter to your Pipeline
:
public class Pipeline<T> {
private List<Stage<T,T>> stages;
public Pipeline(List<Stage<T,T>> stages) {
this.stages = stages;
}
public void processItem(DataItem<T> dataItem) throws Exception {
for (Stage<T, T> stage: stages) {
dataItem = stage.processItem(dataItem);
}
}
B. If types of outputs are different, there is no way to do this in a type-safe manner as your intermediate result will be DataItem of X
where X
will change after every stage. Raw types (meaning that you give up type safety) is an option, but it will produce warnings you see.
Upvotes: 0