Reputation: 897
I was trying out a simple Flink program which just takes a file, reverses strings in the file & writes it out.
The program works, only the individual lines come out of order.
E.g.
File Input
Thing,Name
Person,Vineet
Fish,Karp
Dog,Fido
Output File
Fish,praK
Thing,emaN
Person,teeniV
Dog,odiF
I was expecting:
Thing,emaN
Person,teeniV
Fish,praK
Dog,odiF
Below is the program that I wrote to achieve this:
package testflink;
import java.util.Iterator;
import java.util.StringJoiner;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.Collector;
public class BatchJob {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
System.err.println(env.getParallelism());
DataSource<String> file = env.readTextFile("./data.csv");
file.mapPartition((Iterable<String> values, Collector<String> out) -> {
System.err.println("************* " + out.hashCode() + " Begin");
Iterator<String> iterator = values.iterator();
while (iterator.hasNext()) {
String tuple = iterator.next();
System.err.println("************* " + out.hashCode() + tuple);
String[] split = tuple.split(",");
String tuple1Rev = new StringBuilder(split[1]).reverse().toString();
out.collect(new StringJoiner(",").add(split[0]).add(tuple1Rev).toString());
}
System.err.println("************* " + out.hashCode() + " End");
}).returns(String.class).writeAsText("./dataO.csv", WriteMode.OVERWRITE).setParallelism(1);
env.execute("Flink Batch Java API Skeleton");
System.out.println("Done");
}
}
readAsCsv()
method available. The problem is that the csv can have a dynamic number of comlumns per row/tuple. I wasn't able to figure out how convert it into a DataSource with dynamic number of columns per tuple. MapPartition needs the types defined - how can I substitute Tuple0
- Tuple25
at runtime?Iterable<String> values
parameter?Thanks in advance! :)
Upvotes: 0
Views: 1255
Reputation: 18987
Flink's mapPartition
maintains the order of records within each parallel partition. However, the problem in your use case is how the data is distributed to the parallel tasks of the MapPartition operator.
You are using a TextInputFormat
which divides the input file into several input splits which are independently processed by parallel instances of the data source operator. Each data source instance locally forwards all of its records to a succeeding MapPartition operator and this forwards its result records to the sink. The pipeline looks like this:
source_1 -> mapPartition_1 -> sink_1
source_2 -> mapPartition_2 -> sink_2
source_3 -> mapPartition_3 -> sink_3
...
So from the source on, all records are processed in order. However, since the input splits are randomly assigned to source tasks and sinks operate independently (no coordination), the output is only partially ordered (records read from the same split are ordered).
Setting the parallelism of the source to 1 won't help because it will sent its result records to succeeding tasks in a round-robin fashion to leverage the parallelism of the succeeding operator. Also setting the parallelism of the whole job to 1 does not help, because splits can still be processed in random order by a single source task. The only solution I am aware of would be to have each input record numbered and sorting on that number (with range partitioning for parallel processing) before writing the result.
Upvotes: 4