Omid
Omid

Reputation: 1989

How to define operation on dataframe and run them later?

I want to define a set of aggregates and operations on a dataframe on different stages but I don't want them to be executed (just defined) and then execute them later (like a pipeline) If you are familiar with Frameless (I don't want to use Frameless), something like Job[A].

def addSelect(df:Dataframe) = {
    df.select("name")
}

def addCount(df:Dataframe) = {
    df.count()
}

def addSum(df:Dataframe) = {
    df.sum()
}

def addShow(df:Dataframe) = {
    df.show()
}

val df = ...
val pipeline = addSum( addSelect(df) )
//if(userWantsToExecute) pipeline.execute()


Upvotes: 1

Views: 51

Answers (1)

You can create your own Pipeline like this:

final class Pipeline[A] private (val run: () => A) extends AnyVal {  
  final def compose[B](f: A => B): Pipeline[B] =
    new Pipeline(() => f(this.run()))
}

object Pipeline {
  def start[A](input: => A): Pipeline[A] =
    new Pipeline(() => input)
}

This is basically the same I said in my comment, but this makes it easier to use.
For example:

val pipeline = Pipeline.start(input = df).compose(addSelect).compose(addSum)
pipeline.run()

Note: The above code is for general use, you can make it more specific to DataFrames if you want.

Upvotes: 2

Related Questions