Cody Canning
Cody Canning

Reputation: 174

Extraneous groupBy in spark DAG

According to the spark DAG vizualization there is a groupBy being performed in Stage 1 after the groupBy being performed in Stage 0. I only have one groupBy in my code and wouldn't expect any of the other transformations I'm doing to result in a groupBy.

Here's the code (clojure / flambo):

;; stage 0
(-> (.textFile sc path 8192)
    (f/map (f/fn [msg] (json/parse-string msg true)))
    (f/group-by (f/fn [msg] (:mmsi msg)) 8192)

;; stage 1
    (f/map-values (f/fn [values] (sort-by :timestamp (vec values))))
    (f/flat-map (ft/key-val-fn (f/fn [mmsi messages]
                                 (let [state-map (atom {}) draught-map (atom {})]
                                   (map #(mk-line % state-map draught-map) (vec messages))))))
  (f/map (f/fn [line] (json/generate-string line)))
  (f/save-as-text-file path)))

It's clear to me how Stage 0 is the sequence textFile, map, groupBy and Stage 1 is map-values, map-values, flat-map, map, saveAsTextFile, but where does the groupBy in stage 1 come from?

enter image description here

Since groupBy causes a shuffle which is computationally expensive and time-consuming I don't want an extraneous one if it can be helped.

Upvotes: 2

Views: 146

Answers (1)

zero323
zero323

Reputation: 330303

There is no extraneous groupBy here. groupBy is a two-step process. The first step is a local map which transforms from x to (f(x), x). This is the part which is represented as a groupBy block in the Stage 0.

The second step is non-local groupByKey which is marked as a groupBy block in the Stage 1. Only this part requires shuffling.

Upvotes: 1

Related Questions