Reputation: 174
According to the spark DAG vizualization there is a groupBy
being performed in Stage 1 after the groupBy
being performed in Stage 0. I only have one groupBy
in my code and wouldn't expect any of the other transformations I'm doing to result in a groupBy
.
Here's the code (clojure
/ flambo
):
;; stage 0
(-> (.textFile sc path 8192)
(f/map (f/fn [msg] (json/parse-string msg true)))
(f/group-by (f/fn [msg] (:mmsi msg)) 8192)
;; stage 1
(f/map-values (f/fn [values] (sort-by :timestamp (vec values))))
(f/flat-map (ft/key-val-fn (f/fn [mmsi messages]
(let [state-map (atom {}) draught-map (atom {})]
(map #(mk-line % state-map draught-map) (vec messages))))))
(f/map (f/fn [line] (json/generate-string line)))
(f/save-as-text-file path)))
It's clear to me how Stage 0 is the sequence textFile
, map
, groupBy
and Stage 1 is map-values
, map-values
, flat-map
, map
, saveAsTextFile
, but where does the groupBy
in stage 1 come from?
Since groupBy
causes a shuffle which is computationally expensive and time-consuming I don't want an extraneous one if it can be helped.
Upvotes: 2
Views: 146
Reputation: 330303
There is no extraneous groupBy here. groupBy
is a two-step process. The first step is a local map
which transforms from x
to (f(x), x)
. This is the part which is represented as a groupBy
block in the Stage 0.
The second step is non-local groupByKey
which is marked as a groupBy
block in the Stage 1. Only this part requires shuffling.
Upvotes: 1