Reputation: 31
I have one dataset that contains a list of records holding time period (represented with nanoseconds: two Longs, one for start, one for end), and measured value. I need to create new, aggregated dataset that holds just periods where values are changed. For example:
input dataset:
+-----+-----+-----+
|start|end |value|
+-----+-----+-----+
|123 |124 |1 |
|124 |128 |1 |
|128 |300 |2 |
|300 |400 |2 |
|400 |500 |3 |
result dataset:
+-----+-----+-----+
|start|end |value|
+-----+-----+-----+
|123 |128 |1 |
|128 |400 |2 |
|400 |500 |3 |
I know how to do this on small datasets, but have no idea how to use mapreduce paradigm, and Apache Spark.
Can you please give me a hint how to achieve this in Apache Spark, java?
Upvotes: 2
Views: 1210
Reputation: 1246
It seems quite simple this way. If you find the min and max with groupBy and then combine the datasets.
// df is original dataset
Dataset<Row> df_start = df.groupBy("value").min("start").withColumnRenamed("min(start)", "start").withColumnRenamed("value", "value_start");
Dataset<Row> df_end = df.groupBy("value").max("end").withColumnRenamed("max(end)", "end").withColumnRenamed("value", "value_end");
Dataset<Row> df_combined = df_start.join(df_end, df_start.col("value_start").equalTo(df_end.col("value_end"))).drop("value_end").withColumnRenamed("value_start", "value").orderBy("value");
df_combined.show(false);
+-----+-----+---+
|value|start|end|
+-----+-----+---+
|1 |123 |128|
|2 |128 |400|
|3 |400 |700|
+-----+-----+---+
Upvotes: 1
Reputation: 478
One approach to this is to phrase your problem as "for each distinct value, find all the adjacent time ranges for the value and coalesce them". With that understanding you can use groupBy
on the value to create a list of start
and end
's for each value. Then you can use a custom function to collapse these down into the contiguous time ranges.
At the extreme end, if you use disk-only persistence level on the dataset, the only requirement is that you be able to fit a single row of start_end
s into memory. This puts the upper limit to this approach in the gb of start_end
pairs per value for most clusters.
Here's an example implementation (using the Java API as requested - Scala would be quite a bit less verbose):
public class JavaSparkTest {
public static void main(String[] args){
SparkSession session = SparkSession.builder()
.appName("test-changes-in-time")
.master("local[*]")
.getOrCreate();
StructField start = createStructField("start", DataTypes.IntegerType, false);
StructField end = createStructField("end", DataTypes.IntegerType, false);
StructField value = createStructField("value", DataTypes.IntegerType, false);
StructType inputSchema = createStructType(asList(start,end,value));
StructType startEndSchema = createStructType(asList(start, end));
session.udf().register("collapse_timespans",(WrappedArray<Row> startEnds) ->
JavaConversions.asJavaCollection(startEnds).stream()
.sorted((a,b)->((Comparable)a.getAs("start")).compareTo(b.getAs("start")))
.collect(new StartEndRowCollapsingCollector()),
DataTypes.createArrayType(startEndSchema)
);
Dataset<Row> input = session.createDataFrame(asList(
RowFactory.create(123, 124, 1),
RowFactory.create(124, 128, 1),
RowFactory.create(128, 300, 2),
RowFactory.create(300, 400, 2),
RowFactory.create(400, 500, 3),
RowFactory.create(500, 600, 3),
RowFactory.create(600, 700, 3)
), inputSchema);
Dataset<Row> startEndByValue = input.selectExpr("(start start, end end) start_end", "value");
Dataset<Row> startEndsByValue = startEndByValue.groupBy("value").agg(collect_list("start_end").as("start_ends"));
Dataset<Row> startEndsCollapsed = startEndsByValue.selectExpr("value", "explode(collapse_timespans(start_ends)) as start_end");
Dataset<Row> startEndsInColumns = startEndsCollapsed.select("value", "start_end.start", "start_end.end");
startEndsInColumns.show();
}
public static class StartEndRowCollapsingCollector implements Collector<Row, List<Row>, List<Row>>{
@Override
public Supplier<List<Row>> supplier() {
return ()-> new ArrayList<Row>();
}
@Override
public BiConsumer<List<Row>, Row> accumulator() {
return (rowList, row) -> {
// if there's no rows in the list or the start doesn't match the current end
if(rowList.size()==0 ||
!rowList.get(rowList.size()-1).getAs(1).equals(row.getAs(0))){
rowList.add(row);
} else {
Row lastRow = rowList.remove(rowList.size()-1);
rowList.add(RowFactory.create(lastRow.getAs(0), row.getAs(1)));
}
};
}
@Override
public BinaryOperator<List<Row>> combiner() {
return (a,b)->{ throw new UnsupportedOperationException();};
}
@Override
public Function<List<Row>, List<Row>> finisher() {
return i->i;
}
@Override
public Set<Characteristics> characteristics() {
return Collections.EMPTY_SET;
}
}
}
And the program output:
+-----+-----+---+
|value|start|end|
+-----+-----+---+
| 1| 123|128|
| 3| 400|700|
| 2| 128|400|
+-----+-----+---+
Notice the values are not in order. This is because of the fact that spark has partitioned the data set and processed the value rows, and you haven't told it to assign any significance to the row ordering. Should you require time or value sorted output you could of course just sort it in the usual way.
Upvotes: 0