mkunkel
mkunkel

Reputation: 263

Spark Dataset error with groupBy on Java POJO

I have a set of data not in any format Apache-spark can use. I create a class for such data i.e.

public class TBHits {

    int status;
    int trkID;

    public TBHits(int trkID, int status) {
        this.status = status;

        this.trkID = trkID;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }



    public int getTrkID() {
        return trkID;
    }

    public void setTrkID(int trkID) {
        this.trkID = trkID;
    }

}

In a script that processes the data, I create a List

private List<TBHits> deptList = new ArrayList<TBHits>();

When processing the data I create the TBHits object and add it to the List

...
...     
TBHits tbHits = new TBHits((bnkHits.getInt("trkID", i)), (bnkHits.getInt("status", i)));
tbHitList.add(tbHits);
...

After the processing I create the DataSet and do a basic show and a basic filter

Dataset<Row> tbHitDf = spSession.createDataFrame(tbHitList, TBHits.class);
tbHitDf.show();
deptDf.filter(deptDf.col("trkID").gt(0)).show();

And All is OK.

+------+-----+
|status|trkID|
+------+-----+
|     1|    0|
|     1|    0|
...
...

+------+-----+
|status|trkID|
+------+-----+
|     1|    1|
|     1|    1|
|     1|    1|

...
...

When I attempt to use groupBy and count

tbHitDf.groupBy("trkID").count().show();

, I get an not understandable error

Exception in thread "main" java.lang.StackOverflowError
    at java.io.ObjectStreamClass$WeakClassKey.<init>(ObjectStreamClass.java:2307)
    at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:322)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
...
...
...

But if I manually insert data

TBHits tb1 = new TBHits(1, 1);
TBHits tb2 = new TBHits(1, 2);
tbHitList.add(tb1);
tbHitList.add(tb2);

Then the groupBy function works properly. I cannot understand why.

Upvotes: 0

Views: 164

Answers (1)

mkunkel
mkunkel

Reputation: 263

For future users. The solution was to use an Encoder, i.e.

Encoder<TBHits> TBHitsEncoder = Encoders.bean(TBHits.class);
Dataset<TBHits> tbHitDf = spSession.createDataset(tbHitList, TBHitsEncoder);

Upvotes: 1

Related Questions