Reputation: 59
I am trying to run a sliding window calculation on a set of tuples which is in a csv file. Each row has a date associated with it. After reading the csv file using TextIO.Read
I apply a ParDo
transformation to change the timestamp of each element in the PCollection
.
//Reading and time stamping the stock prices
PCollection<KV<Integer, StockPricePoint>> stockPrices = pipeline
.apply(TextIO.Read.from("./inputFiles/2004.csv"))
.apply(ParDo.of(new DoFn<String, KV<Integer, StockPricePoint>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String[] fields = c.element().split(",");
try {
StockPricePoint stockPoint = new StockPricePoint();
stockPoint.setId(fields[0]);
SimpleDateFormat sdf = new SimpleDateFormat("yyyymmdd");
stockPoint.setDate(sdf.parse(fields[1].trim()));
stockPoint.setSymbol(fields[2]);
stockPoint.setPrice(Double.parseDouble(fields[5].trim()));
stockPoint.setCap(Double.parseDouble(fields[6].trim()));
Instant instant = new Instant(stockPoint.getDate().getTime());
c.outputWithTimestamp(KV.of(
symbolEncoder.getSymbolIndex(stockPoint.getSymbol()), stockPoint),
instant);
} catch (Exception ex) {
//Todo accumulate errors
ex.printStackTrace();
}
}
});
Then I apply a sliding window transformation as follows
//creating the sliding windows
PCollection<KV<Integer, StockPricePoint>> slidingWindowStockPrices = stockPrices
.apply(Window.<KV<Integer, StockPricePoint>>into(
SlidingWindows.of(Duration.standardDays(30))
.every(Duration.standardDays(5)));
After than when I invoke a GroupByKey
transformation as follows I get a GlobalWindow cannot be cast to IntervalWindow
Exception. What could be possibly going wrong here?
slidingWindowStockPrices.apply(GroupByKey.create());
Full stacktrace can be found here http://pastebin.com/QUStvrfB
Upvotes: 0
Views: 587
Reputation: 31
I've tried to reproduce the behaviour in a unit test I wrote. Please see: https://issues.apache.org/jira/browse/BEAM-112
I've used your code snippets for a functional test to try and reproduce but I couldn't reproduce the issue. I did however note that you were working with an early version of the Spark runner before it was integrated (or even Cloudera's spark-dataflow) so it might be worth a shot to try to update your spark runner.
Please let me know if I've missed anything here.
Upvotes: 0
Reputation: 6130
This does not look like a problem with the Google Cloud Dataflow service. You can also try the DirectPipelineRunner
for local testing.
It looks like a problem with the Spark runner, which may not yet implement the full semantics necessary for triggering in the Global Window. Spark runner is now maintained by Apache Beam (Incubating) project. I have filed a ticket on the project's Jira to track this issue.
Upvotes: 1