Supun Nakandala
Supun Nakandala

Reputation: 59

GlobalWindow cannot be cast to IntervalWindow

I am trying to run a sliding window calculation on a set of tuples which is in a csv file. Each row has a date associated with it. After reading the csv file using TextIO.Read I apply a ParDo transformation to change the timestamp of each element in the PCollection.

//Reading and time stamping the stock prices
PCollection<KV<Integer, StockPricePoint>> stockPrices = pipeline
  .apply(TextIO.Read.from("./inputFiles/2004.csv"))
  .apply(ParDo.of(new DoFn<String, KV<Integer, StockPricePoint>>() {
    @Override
    public void processElement(ProcessContext c) throws Exception {
      String[] fields = c.element().split(",");
      try {
        StockPricePoint stockPoint = new StockPricePoint();
        stockPoint.setId(fields[0]);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyymmdd");
        stockPoint.setDate(sdf.parse(fields[1].trim()));
        stockPoint.setSymbol(fields[2]);
        stockPoint.setPrice(Double.parseDouble(fields[5].trim()));
        stockPoint.setCap(Double.parseDouble(fields[6].trim()));
        Instant instant = new Instant(stockPoint.getDate().getTime());
        c.outputWithTimestamp(KV.of(
          symbolEncoder.getSymbolIndex(stockPoint.getSymbol()), stockPoint), 
          instant);
      } catch (Exception ex) {
        //Todo accumulate errors
        ex.printStackTrace();
      }
    }
  });

Then I apply a sliding window transformation as follows

//creating the sliding windows
PCollection<KV<Integer, StockPricePoint>> slidingWindowStockPrices = stockPrices
  .apply(Window.<KV<Integer, StockPricePoint>>into(
    SlidingWindows.of(Duration.standardDays(30))
        .every(Duration.standardDays(5)));

After than when I invoke a GroupByKey transformation as follows I get a GlobalWindow cannot be cast to IntervalWindow Exception. What could be possibly going wrong here?

   slidingWindowStockPrices.apply(GroupByKey.create());

Full stacktrace can be found here http://pastebin.com/QUStvrfB

Upvotes: 0

Views: 587

Answers (2)

Amit Sela
Amit Sela

Reputation: 31

I've tried to reproduce the behaviour in a unit test I wrote. Please see: https://issues.apache.org/jira/browse/BEAM-112

I've used your code snippets for a functional test to try and reproduce but I couldn't reproduce the issue. I did however note that you were working with an early version of the Spark runner before it was integrated (or even Cloudera's spark-dataflow) so it might be worth a shot to try to update your spark runner.

Please let me know if I've missed anything here.

Upvotes: 0

Ben Chambers
Ben Chambers

Reputation: 6130

This does not look like a problem with the Google Cloud Dataflow service. You can also try the DirectPipelineRunner for local testing.

It looks like a problem with the Spark runner, which may not yet implement the full semantics necessary for triggering in the Global Window. Spark runner is now maintained by Apache Beam (Incubating) project. I have filed a ticket on the project's Jira to track this issue.

Upvotes: 1

Related Questions